Building a Home Server

In this article, I’ll document my process of building a home server – or NAS – for local storage, smb drives, backups, processing, git, CD-rips, and other headless computing.

Why a Home Server?

The necessity of a NAS these days can be questioned, given the cheap – or free – availability of cloud storage. However, getting into a dependency with an external vendor poses significant security risk for individuals that do not enjoy the same benefits a corporate account with big bills might receive, notably –

  • Changes in storage tiers and costs (see Flickr limiting free user’s storage tier to 1,000 photos, from previously 1TB or Microsoft dropping unlimited storage for OneDrive – especially fun if you already have gigabytes, if not terabytes, stored there)
  • Upload capacity with broadband connections (except for Google Fiber, no American ISP offers symmetric up/down links) – uploading my CD vinyl collection, even ripped as compressed MP3s @ ~70GB, would take me a easily half a day – (not to mention my 2TB Steam collection) and that is only a fraction of our available local storage)
  • And hence, high-speed backups from all local devices, such as PCs, Macs, Routers, etc.
  • Privacy concerns – while you can, naturally, uploaded encrypted files, you lose the benefit of automatic syncing in most cases
  • Flexibility to run additional services such as a pi-hole (DNS ad blocker), monitoring software, local build server and git repositories, or VPN services
  • Service outages on cloud services or your local internet

However, for the record – the server where you reading this article on does get backed up to AWS S3 via an encrypted duplicity backup (which is neither free nor user-friendly). There is no “one size fits all” solution.

As a preliminary note: Do not copy paste any of the commands without making sure all parameters match your system’s configuration!


  • CPU: AMD – Ryzen 3 2200G 3.5 GHz Quad-Core Processor  
  • Motherboard: Gigabyte – GA-AB350-GAMING 3 ATX AM4 Motherboard
  • Memory: 2x Corsair – Vengeance LPX 8 GB (1 x 8 GB) DDR4-2400 Memory
  • Storage: Western Digital – Green 240 GB M.2-2280 Solid State Drive  
  • Storage: 2x Western Digital – Red 6 TB 3.5″ 5400RPM Internal Hard Drive  
  • Storage: 2x Western Digital – Red 3 TB 3.5″ 5400RPM Internal Hard Drive    
  • Power Supply: EVGA – SuperNOVA G3 550 W 80+ Gold Certified Fully-Modular ATX Power Supply
  • Case Fan: 2x Noctua – NF-F12 PWM 54.97 CFM 120mm Fan
  • Case Fan: 2x Noctua – NF-R8 redux-1800 PWM 31.37 CFM 80mm Fan  
  • Case: Rosewill RSV-R4000, 4U Rackmount Server Case / Server Chassis, 8

The total price (as of 2019-04-06) is roughly $1,200 USD.

I’ve also added some existing hardware, like 2x 1TB Samsung HDDs and a PCI-to-SATA extension card.


Make sure you have the follow tools available –

  • A laptop or desktop running any GNU/Linux or MacOS (Windows instructions differ – you will need an ssh client like putty or MobaXTerm as well as unetbootin to burn the ISO)
  • An ethernet cable
  • A keyboard
  • A screwdriver
  • A USB thumb drive
  • All server hardware
  • (Optional) Small LCD screen and spare keyboard to access the drive if SSH won’t work for some reason

First, we’ll get ourselves a Debian 9 ISO and burn it to a USB Thumb Drive. Other operating systems are available – I like Debian for it’s stability and easy maintenance and already run several Debian boxes.

cd ~/Downloads
sudo fdisk -l | grep -B 2 -A 15 "Flash"
sudo dd bs=4M if=debian-9.8.0-amd64-netinst.iso of=/dev/sdf
➜  Downloads sudo dd bs=4M if=debian-9.8.0-amd64-netinst.iso of=/dev/sdf
73+0 records in
73+0 records out
306184192 bytes (306 MB, 292 MiB) copied, 11.6768 s, 26.2 MB/s

Hardware setup

This step is fairly straightforward. Assemble the hardware, plug in it, and make sure it posts. In the process, make sure the room you are using to assemble the system looks like a tornado went through it.

Cable management optional

Install Operating System

With your hardware connected to a screen, keyboard, and ethernet, it’s time to install the OS. Insert the flash drive, boot into it on the server hardware, and install Debian. I used my M.2 SSD with separate /home and /tmp partitions and enabled disk encryption with LUKS.

Software setup

Once the installation is completed, we can set up the software.

First, we need to configure our network. Take a backup of the configuration and edit the file with vi:

sudo cp /etc/network/interfaces /etc/network/interfaces.bak
sudo vi /etc/network/interfaces

Something like this should be the content for your interface file:

auto lo
iface lo inet loopback

iface eth0 inet static

sudo vi /etc/resolv.conf

Keep in mind that your IP configuration might be different. Since I am running a pi-hole on IP, my DNS configuration refers to that server. For your environment, you probably want to go with and as DNS tuple. You can always check on your other devices via $ ifconfig or similar commands.

Next, we’ll configure the hostname and host resolution and restart the network service.

My hostname is “bigiron”, which might or might not be a Marty Robbins and Fallout reference. You can choose whatever you like.

sudo echo bigiron > /etc/hostname

sudo vi /etc/hosts
# Example     localhost   bigiron

Before doing anything, we’ll restart the network service and test the connection. Make sure you can reach

sudo service networking restart
if [[ $(hostname) == "bigiron" ]]; then echo "success"; else echo "failed"; fi

Then, we’ll update our packages and install all required software:

sudo apt-get update
sudo apt-get upgrade
sudo apt-get install zsh \
vim \
git \
openssh-server \
parted \
wget \
curl \
cryptsetup \
mdadm \
libcups2 samba samba-common cups 

The software we need to install here is fairly standard – zsh is going to be our shell, vim serves as editor, git is used to handle repositories, openssh-server is used for SSH connection, wget & curl are used for downloads and network tests, cryptsetup and mdadm are used for disk encryption and RAID arrays, and the samba dependencies are for file shares.

Disclaimer: There is a known vulnerability for wget, CVE-2019-5953.

There is a known vulnerability for wget, CVE-2019-5953

Next, we’ll configure oh-my-zsh, a neat little wrapper around zsh that is my default shell on all *NIX boxes.

sh -c "$(wget -O -)"

Now, we need to configure ssh to use the system remotely from our other machine.

sudo vim /etc/ssh/sshd_config

Add the following content (using your user name) and restart ssh. Please make sure you configure public/private key authentication in due time.

# Users
AllowUsers christian
# Logging
SyslogFacility AUTH
LogLevel INFO
# Authentication:
LoginGraceTime 2m
PermitRootLogin no
StrictModes yes
MaxAuthTries 6
MaxSessions 10
service ssh restart

Hard drive setup

Congratulations, your system is now operational. Granted – it doesn’t do anything. But we’ll change that in a spiffy!

You can now connect to your system using SSH (which is what we are going to do).

One of our most command commands is going to be the following:

sudo fdisk -l 
Disk /dev/sdh: 5.5 TiB, 6001175126016 bytes, 11721045168 sectors
Units: sectors of 1 * 512 = 512 bytes
Sector size (logical/physical): 512 bytes / 4096 bytes
I/O size (minimum/optimal): 4096 bytes / 4096 bytes
Disklabel type: gpt
Disk identifier: 46AAB764-3004-481A-AA2D-5D751D996AC1

Device     Start         End     Sectors  Size Type
/dev/sdh1   2048 11721045134 11721043087  5.5T Linux filesystem

This shows us the physical disks in the system, as well as their partitions. As I’ve mentioned before – do not simply copy paste these commands. Otherwise you will risk losing data and messing up your system.

Make sure you always take note of your disks’ identifiers (e.g., /dev/sda) and UUIDs.

Do not copy paste any of the commands without making sure all parameters match your system’s configuration

With that out of the way, we will need to add partitions to our first set of hard drives and format them. This will delete all data on the disks.

sudo parted /dev/sdb
(parted) mklabel gpt                                                      
Warning: The existing disk label on /dev/sdc will be destroyed and all data on this disk will be lost. Do you want to continue?
Yes/No? y                                                                 
(parted) mkpart primary ext 0% 100%                                  
(parted) exit         
sudo mke2fs -t ext4 /dev/sdb2

Once that is done, we need to create our RAID-1 array as such. We use RAID-1 to protect ourselves against disk failures. A RAID-1 array protects against drive failures – it is not a backup!

A RAID-1 array protects against drive failures – it is not a backup!

mdadm --create --verbose --level=1 --metadata=1.2 --raid-devices=2 /dev/md/6TB /dev/sdb1 /dev/sdc1

You can check the configuration as well as the sync status of your RAID-1 array as such:

➜  ~ sudo mdadm --detail /dev/md127                                                                          
        Version : 1.2
  Creation Time : Sat Apr  6 18:27:22 2019
     Raid Level : raid1
     Array Size : 5860390464 (5588.90 GiB 6001.04 GB)
  Used Dev Size : 5860390464 (5588.90 GiB 6001.04 GB)
   Raid Devices : 2
  Total Devices : 2
    Persistence : Superblock is persistent

  Intent Bitmap : Internal

    Update Time : Sat Apr  6 18:27:22 2019
          State : clean, resyncing 
 Active Devices : 2
Working Devices : 2
 Failed Devices : 0
  Spare Devices : 0

  Resync Status : 0% complete

           Name : bigiron:6TB  (local to host bigiron)
           UUID : ffafd947:3c116230:a1e40521:332156c7
         Events : 1

    Number   Major   Minor   RaidDevice State
       0       8       17        0      active sync   /dev/sdb1
       1       8       33        1      active sync   /dev/sdc1

Encryption with LUKS

Then, we’ll add LUKS encryption to the array:

sudo cryptsetup luksFormat /dev/md127
sudo cryptsetup luksOpen /dev/md127 6TB_LUKS
sudo mkfs.ext4 /dev/mapper/6TB_LUKS
sudo mkdir /mnt/6TB
sudo mount /dev/mapper/6TB_LUKS /mnt/6TB

Once that is done, we’ll add a key file to make sure we don’t need to punch in the password every time.

This this step, I am using an external USB thumb drive and hence am mounting /dev/sdd1 to /mnt/usb. You can also keep the key file on the system, but that would invalidate the point of having an encrypted disk in the case of theft.

sudo mount /dev/sdc1 /mnt/usb
sudo dd bs=512 count=4 if=/dev/random of=/mnt/usb/keyfile iflag=fullblock
4+0 records in
4+0 records out
2048 bytes (2.0 kB, 2.0 KiB) copied, 891.521 s, 0.0 kB/s
sudo chmod 600 /mnt/usb/keyfile
sudo cryptsetup luksAddKey /dev/md127 /mnt/usb/keyfile --key-slot 1

Wonderful. Now, let’s make sure that the thumb drive containing the key gets mounted on boot by editing /etc/fstab

sudo fdisk -l
sudo blkid /dev/sdc1 | awk -F'"' '{print $2}'
sudo vim /etc/fstab
UUID=b796c692-2ba5-49cf-a45d-dbd62228dd98	/mnt/usb	auto nosuid,nodev,nofail 0 0

Then, we need to add the actual encrypted disk to /etc/fstab as well as /etc/crypttab.

# Get UUID
sudo cryptsetup luksDump /dev/md127 | grep "UUID" 

# Edit /etc/crypttab
sudo  mdadm --detail /dev/md127
sudo vim /etc/crypttab 
# 6TB_LUKS UUID=e4cd8646-375d-4d84-886c-ba80b31698ad /mnt/usb/keyfile luks
sudo vim /etc/fstab
# /dev/mapper/6TB_LUKS  /mnt/6TB    ext4    defaults        0       2

In my case, I have only enabled this for the data drives, not for the boot drive. This is a matter of personal preference – I have a small 7″ LCD on the 19″ rack, including a keyboard, and simply punch in the LUKS key code on reboot. This won’t work you are not home, but leaving the thumb-drive in the system while you are away won’t help theft prevention of data. Feel free to send me your own best practices.

With all that done, you can test the setup by mounting everything in /etc/fstab by running:

sudo mount -a

Repeat this process for every RAID array in the system. In my case, that would be 2.

Furthermore, to persist names for existing arrays, follow these steps:

sudo mdadm --stop /dev/md126
sudo mdadm --assemble /dev/md3 --name=3TB --update=name /dev/sda /dev/sdb
sudo mdadm -Db /dev/md3 >> /etc/mdadm/mdadm.conf
sudo update-initramfs -u

On a side-node – if you have existing drives, for instance from your desktop PC, make sure to back up all your data to a different disk, as setting this up will delete all data. You can do this as such:

sudo rsync --partial  -T /mnt/6TB/tmp -azh /mnt/3TB/ /mnt/6TB/ --exclude=".Trash-1000" --progress

On a related note, since I transferred an existing RAID-1 array, I had some massive drive issues that resulted in odd behavior (like timeouts on the READ() call in the kernel, as followed by strace – please don’t ask me why) – if you want to debug issues like these, the following commands might come in handy.

You can see the files being accessed by commands like rsync or cp through /proc with the specific PID of a process:

ps aux | grep rsync
watch sudo ls -l /proc/1446/fd 
Every 2.0s: sudo ls -l /proc/1446/fd                                                                                                                                                                       bigiron: Thu Apr 11 17:36:55 2019

total 0
lrwx------ 1 root root 64 Apr 11 17:03 0 -> /dev/tty1
lrwx------ 1 root root 64 Apr 11 17:03 1 -> /dev/tty1
lrwx------ 1 root root 64 Apr 11 17:03 2 -> /dev/tty1
lr-x------ 1 root root 64 Apr 11 17:24 3 -> /mnt/3TB/memes/2019/04/memes_if_i_die_before_game_of_thrones_comes_out/928_meme.jpg
lrwx------ 1 root root 64 Apr 11 17:03 4 -> socket:[18863]
lrwx------ 1 root root 64 Apr 11 17:03 5 -> socket:[18864]

Also, using strace, you can debug very odd, low-level issues like mine:

strace cp broken.txt broken2.txt
ecve("/bin/cp", ["cp", "broken.txt", "broken2.txt"], [/* 24 vars */]) = 0
brk(NULL)                               = 0x55adfb96b000
access("/etc/", F_OK)      = -1 ENOENT (No such file or directory)
access("/etc/", R_OK)      = -1 ENOENT (No such file or directory)
open("/etc/", O_RDONLY|O_CLOEXEC) = 3
fstat(3, {st_mode=S_IFREG|0644, st_size=35999, ...}) = 0
mmap(NULL, 35999, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7f82ba539000
close(3)                                = 0
access("/etc/", F_OK)      = -1 ENOENT (No such file or directory)
open("/lib/x86_64-linux-gnu/", O_RDONLY|O_CLOEXEC) = 3
read(3, "\177ELF\2\1\1\0\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0000k\0\0\0\0\0\0"..., 832) = 832
fstat(3, {st_mode=S_IFREG|0644, st_size=155400, ...}) = 0
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f82ba537000
mmap(NULL, 2259664, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7f82ba0f7000
mprotect(0x7f82ba11c000, 2093056, PROT_NONE) = 0
mmap(0x7f82ba31b000, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x24000) = 0x7f82ba31b000
mmap(0x7f82ba31d000, 6864, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x7f82ba31d000
close(3)                                = 0

Network shares

Now, we’ll set up network shares with samba. This will allow all devices in the network to access shares on the server.

All we need to do is add a new user for samba (as smb users are separate from the host system’s users), backup our samba configuration and add shares, including valid users as such:

sudo smbpasswd -a christian 
sudo cp /etc/samba/smb.conf /etc/samba/smb.conf.bak
sudo vim /etc/samba/smb.conf
sudo service smbd restart
  server min protocol = NT1
  ; server max protocol = SMB3


path = /mnt/6TB
browseable = yes
writeable = yes
valid users = christian

Test your setup by mounting the smb drive on a different computer. The below example is for Arch Linux. Replace the username and password accordingly.

sudo pacman -S smbclient
sudo mkdir -p /mnt/shares/6TB-Server

Add a secured, read-only file for root to store your passwords:

sudo mkdir -p /etc/samba/credentials/
sudo vim /etc/samba/credentials/share
sudo chown root:root /etc/samba/credentials
sudo chmod 700 /etc/samba/credentials
sudo chmod 600 /etc/samba/credentials/share

And try connecting:

sudo mount -t cifs // /mnt/shares/6TB-Server -o credentials=/etc/samba/credentials/share
Mounted drive on Arch Linux with KDE

I hate to admit it, but Windows makes it a bit easier. 🙂 Please see below for the steps (for Windows 10 Pro):

Right-click on “This PC” and “Map network drive”
Map Drive
Mapped Drive (the size might be off)


Last but not least, we’ll enable automatic updates for a GNU/Linux machine.

This is done with an encrypted duplicity backup. Make sure you store your GPG keys somewhere secure, like an encrypted USB drive.

sudo su
apt-get install duplicity gpg
gpg --gen-key
gpg --list-keys
# Take the GPG_KEY from here
touch /root/
chmod +x /root/
vim /root/
if [ "${EUID}" -ne 0 ]; then
  echo "You're not root"
# Check if external drive is mounted
if [[ -z $(mount -t cifs | grep "${SHARE}") ]]; then
  echo "Disk not connected, aborting"
  exit 1
  echo "Disk available, continuing..."
#MOUNTPONT=$(findmnt -rn -S UUID="${UUID}")
if [ -z "${MOUNTPONT}" ]; then
  mount -t cifs // /mnt/shares/6TB-Server -o credentials=/etc/samba/credentials/share
# Check success
if [ $RET -ne 0 ]; then
  echo "Mount failed!"
  exit 1
# Delete any older than 1 month
#duplicity remove-older-than 7D --force --encrypt-key=9CE17035 --sign-key=9CE17035 file:///mnt/5TB/arch_backup/
# Make the regular backup
# Will be a full backup if past the older-than parameter
duplicity --full-if-older-than 7D --encrypt-key=${GPG_KEY} --sign-key=${GPG_KEY} -exclude=/proc --exclude=/lost+found --exclude=/backups --exclude=/mnt --exclude=/sys --exclude=/opt/virtual /  file:///mnt/shares/6TB-Server/backups/arch/
export GPG_KEY=

You can know schedule this with cron (the below command will run it every day at midnight):

crontab -e
0 0 * * * /root/ > /var/log/backup/$(date --iso)_backup.log

Further Steps

There are a lot more steps that you can take to make your server fully operational.

Here are some ideas –

  • Plex or other home media shares to access your media on e.g. your AppleTV
  • Running your own git server
  • Running your own build server
  • Running nextCloud
  • Enabling wireguard or other VPNs for external access
  • Enable rotating logs and log-backups
  • Enable a duplicity backup pipeline to an external hard drive to safeguard against fires, electrical issues, or theft (a RAID-1 array protects against drive failures – it is not a backup!)

Furthermore, I strongly recommend setting up the following security best practices:

  • Enable ssh login via public/private keys only
  • Enable apparmor to restrict service account’s permissions (this can be very, very time intensive and may break existing software)
  • Enable proper iptable rules
  • Secure your outgoing and incoming ports on your router and physical network (switches)
  • Secure your samba configuration


Running a home server can be taxing on free time and costs, but does allow for some powerful computing capabilities in your own home. Following similar steps like the ones highlighted above allow you to properly own your data, have control over your backups, and be independent of Cloud providers.

It is also a wonderful learning experience for everyone who has interest in GNU/Linux and Sysadmin tasks, but doesn’t want to be limited to small computing power (Raspberry Pi) or purely on fun projects (like running a barebones Debian on an old laptop).

While a regular external hard drive for Backups and Cloud services for anything else might be plenty for most people, if you have the energy to be your own admin, I can highly recommend putting a 19″ rack in your house.

Disclaimer: In case you are married, you might need couples counseling after putting 100lbs of solid steel and a plethora of ethernet cables in your office.

Final Setup
Continue Reading

Analyzing Reddit’s Top Posts & Images With Google Cloud (Part 2 – AutoML)

BMW 128i CloudVision

In the last iteration of this article, we analyzed the top 100 subreddits and tried to understand what makes a reddit post successful by using Google’s Cloud ML tool set to analyze popular pictures.

In this article, we will be extending the last article’s premise – to analyze picture-based subreddits with Dataflow – by using Google’s AutoML Vision toolset, training a model, and exposing it via REST to recognize new images.

The source code for this is available on GitHub under the GNU General Public License v3.0.

What is Reddit?

Reddit is a social network where people post pictures of cats and collect imaginary points, so-called “upvotes”.

Reddit (/ˈrɛdɪt/, stylized in its logo as reddit) is an American social news aggregation, web content rating, and discussion website. Registered members submit content to the site such as links, text posts, and images, which are then voted up or down by other members. Posts are organized by subject into user-created boards called “subreddits”, which cover a variety of topics including news, science, movies, video games, music, books, fitness, food, and image-sharing. Submissions with more up-votes appear towards the top of their subreddit and, if they receive enough votes, ultimately on the site’s front page.”(

Reddit is the 3rd most popular site in the US and provides a wonderful basis for a lot of interesting, user-generated data.

Technology & Architecture

We will be partly re-using the architecture of the last article, with some slight adjustments here and there.


As we focus on the image recognition part, we upload a training set of images to Cloud Storage (alongside with our manual classifications), train an AutoML model, and access Reddit data via our Desktop using REST.

The latter part can be automated in subsequent steps, e.g. using Dataflow and PubSub (you can find some prep-work on my GitHub page).

AutoML Vision

Google’s AutoML is a managed machine learning framework. While it is technically still in Beta, it already proves a tremendous advantage: It more or less automates complex segments of “traditional” machine learning, such as image recognition (AutoML Vision) or NLP (AutoML Natural Language and AutoML Translation).

Specifically AutoML Vision enables developers and engineers who are not familiar with the mathematical intricacies of image recognition to build, train, and deploy ML models on the fly – which is why we are going to use it here.

AutoML vs. Cloud Vision

While the Cloud Vision API gives us access to Google’s ever-growing set of data (that naturally is used to train the internal ML models), we can use AutoML to train our very own model with specific use cases that a common-ground approach – such as Cloud Vision – might not capture.

Now, let’s use one of my favorite topics, cars. The following picture shows the output of the Cloud VIsion API, fed with a picture of my car, an E87 BMW 128i.

BMW 128i CloudVision
BMW 128i CloudVision

While it did classify the car as both “BMW” and “car”, it failed to recognize any specifics.

Let’s take another example, my old E85 BMW Z4 3.0i, from when I was living in Germany:

BMW Z4 3.0i CloudVision
BMW Z4 3.0i CloudVision

Once again, it figured out we are dealing with a BMW, but the fact that the massive hood that houses the beauty that is the naturally aspirated three liter I6, nor the fact that the roof is, in fact, missing told Cloud Vision that this must be a Z4.

The main decision criteria here should be: Is it worth spending the extra effort to train your own model? Is your data set so specific and unique that it warrants its own model? Do you have proper Data Scientists in your organization that could do a (better) custom job?

In our case – yes, it is. So, time to train our own model, without having to deal with Tensorflow, massive coding efforts, or a Masters in Statistics.

Getting data & classifying images

As we are trying to extend the idea of image recognition, we first need a set of images to get started on. For that, we will use /r/bmw where people show off their BMWs, mostly E30 and F80 M3s (keep at it folks, I cannot get enough of them). What could go wrong with user-generated content for training sets?

A simple script is used to re-use part of our existing reddit/praw and Python setup to simply pull the top posts from the subreddit, filter by the type “extMedia”, save it as image under /tmp and prepare a CSV file that we will use for classification later.

The resulting images wind up on Google Cloud Storage (GCS).

# encoding=utf8
from __future__ import print_function

import config
import os
import praw
import urllib
import re

from reddit.Main import get_top_posts

__author__ = "Christian Hollinger (otter-in-a-suit)"
__version__ = "0.1.0"
__license__ = "GNU GPLv3"

def unixify(path):
    return re.sub('[^\w\-_\. ]', '_', path)

def get_image(post, img_path):
    filename = unixify(post.title)
    tmp_uri = '{}{}'.format(img_path, filename)
    print('Saving {url} as {tmp}'.format(url=post.content, tmp=tmp_uri))
    urllib.urlretrieve(post.content, tmp_uri)
    return tmp_uri, filename

def write_gcp(_input, _output, bucket_name):
    from import storage
    # Instantiates a client
    storage_client = storage.Client()

    # Gets bucket
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(_output)

    # Upload

    print('Uploading {} to bucket {}'.format(_output, bucket_name))

def csv_prep(gcs_loc):
    return '{gcs_loc}|\n'.format(gcs_loc=gcs_loc).encode('utf-8')

def main():
    import sys

    # Get reddit instance
    reddit = praw.Reddit(client_id=config.creddit['client_id'],
    # Set GCP path
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.cgcp['api_key']
    LIMIT = config.limit
    bucket_name = config.cgcp['images']

    # Settings

    # Get top posts
    top_posts = get_top_posts(subreddit, reddit, LIMIT)

    # Filter images
    images = filter(lambda p: p.type == 'extMedia', top_posts)

    csv = ''
    # Download images
    for post in images:
        tmp, filename = get_image(post, img_path)
        write_gcp(tmp, subreddit + '/' + filename, bucket_name)
        csv += csv_prep('gs://{bucket_name}/{subreddit}/{filename}'
                        .format(bucket_name=bucket_name, subreddit=subreddit, filename=filename))

    # Dump pre-classifier CSV
    with open(img_path+'images.csv', 'a') as file:

if __name__ == "__main__":

Now, here’s where the fun begins – we need to classify the training set by hand. It is generally recommended to have at least 100 images per category (Google actually offers a human-driven service for this!), but we are going to stick to less – it’s Saturday.

In order to simplify the model, I dumbed-down my categories – 1 and 2 series, 3 and 4 series, 5 and 6 series, concept and modern, Z3, Z4, and Z8 as well as classics, such as the iconic M1 or 850Csi. The latter introduces way to much noise, however, having a folder full of those cars is fun on its own.

➜  bmw ls -d */

1-2series/  3-4series/ 5-6series/  classics/ concept-modern/  z3-4-8/

Setting up AutoML

After labeling the images, we can proceed to AutoML and point to our CSV file. As the CSV contains the gs:// path and label, we are presented with an overview that looks like this:

Reddit AutoML Data
Reddit AutoML Data

Once the labeling is complete, we can train the model from the Web UI. It will warn you that you don’t have enough labels per image.

Label Warning
Label Warning

After the training is complete, we can see the model performance.

Model Performance (Reddit)
Model Performance (Reddit)

That does not look good. How did this happen?

The answer is fairly simple: All of reddit’s images are taken from various angles, in various lighting conditions, and with various model years. The noise in the images is too high to achieve a good result and we don’t have enough data to properly train the model.

Image Recognition Theory

In order to understand the issue, let’s talk theory for a second. Cloud ML uses TensorFlow to classify our images, exposing the model via an easy API.

As usual, this is not a scientific paper – I’m one of those engineer-folks who use the research output, not the researcher. Things will simplified and maybe even wrong. But hey, it works in the end!

What is TensorFlow?

“TensorFlow is an interface for expressing machine learning algorithms, and an implementation for executing such algorithms. […] In a TensorFlow graph, each node has zero or more inputs and zero or more outputs, and represents the instantiation of an operation. Values that flow along normal edges in the graph (from outputs to inputs) are tensors, arbitrary dimensionality arrays where the underlying element type is specified or inferred at graph-construction time. Special edges, called control dependencies, can also exist in the graph: no data flows along such edges, but they indicate that the source node for the control dependence must finish executing before the destination node for the control dependence starts executing.” (Abadi, M.; Agarwal, A.; Barham, P.; Brevdo, E.; Chen, Z.; Citro, C.; Corrado, G.S.; Davis, A.; Dean, J.; Devin, M.: Tensorflow: Large-scale machine learning on heterogeneous distributed systems. arXiv:1603.04467)

While AutoML uses Google’s NASNet approach to find the right architecture –

“Our work makes use of search methods to find good convolutional architectures on a dataset of interest. The main search method we use in this work is the Neural Architecture Search (NAS) framework proposed by [71]. In NAS, a controller recurrent neural network (RNN) samples child networks with different architectures. The child networks are trained to convergence to obtain some accuracy on a held-out validation set. The resulting accuracies are used to update the controller so that the controller will generate better architectures over time. The controller weights are updated with policy gradient (see Figure 1).”

Figure 1
Figure 1

(Barret Zoph, Vijay Vasudevan, Jonathon Shlens, Quoc V. Le: Learning Transferable Architectures for Scalable Image Recognition. arXiv:1707.07012v4)

…we will quickly talk about Convolutional Neural Networks.

“CNNs use a variation of multilayer perceptrons designed to require minimal preprocessing. They are also known as shift invariant or space invariant artificial neural networks (SIANN), based on their shared-weights architecture and translation invariance characteristics.

Convolutional networks were inspired by biological processes in that the connectivity pattern between neurons resembles the organization of the animal visual cortex. Individual cortical neurons respond to stimuli only in a restricted region of the visual field known as the receptive field. The receptive fields of different neurons partially overlap such that they cover the entire visual field.

CNNs use relatively little pre-processing compared to other image classification algorithms. This means that the network learns the filters that in traditional algorithms were hand-engineered. This independence from prior knowledge and human effort in feature design is a major advantage.”


Given the nature these networks work – by analyzing an images binary components in a set of computational layers – it is easy to confuse the network with seemingly different, albeit actually very similar images.

Bharath Ray’s article (link below) explains it as follows:

“It finds the most obvious features that distinguishes one class from another. Here [an example about cars taken from different angles], the feature was that all cars of Brand A were facing left, and all cars of Brand B are facing right”

Check out this article by Bharath Ray on Medium for more details on how do overcome this manually.

Adjusting the Model

The solution to our problem is fairly simple – we just need a better training set with more data from more images. Car pictures tend to be taken from very similar angles, in similar lighting conditions, and with similar trim styles of vehicles.

First, we’ll use to get ourselves proper training images from Google. Ironic, isn’t it?

Download Training Set
Download Training Set

Next, simply create a mock-CSV and quickly populate the classifiers.

cd '/tmp/bmw/BMW 3 series/'
gsutil -m cp  "./*" gs://calcium-ratio-189617-vcm/bmw-1s
gsutil ls gs://calcium-ratio-189617-vcm/bmw-1s >> ../1series.csv

After getting enough training data, we can go back to AutoML Vision and create a new model based on our new images.

Classification CSV
Classification CSV

After importing the file, we are greeted with a much more usable training set:

Google AutoML Data
Google AutoML Data

Now, when we evaluate the model, it looks a lot less grim:

Model Performance
Model Performance

Using our model with Reddit data

After we figured out the issue with our training set, let’s try out the REST API.

We’ll use this image from reddit:

by /u/cpuftw at

And simply throw a REST request at it, using a simple Python 2 script:

import sys

from import automl_v1beta1
from import service_pb2

This code snippet requests a image classification from a custom AutoML Model
Usage: python2 $img $project $model_id

def get_prediction(_content, project_id, model_id):
    prediction_client = automl_v1beta1.PredictionServiceClient()

    name = 'projects/{}/locations/us-central1/models/{}'.format(project_id, model_id)
    payload = {'image': {'image_bytes': _content }}
    params = {}
    request = prediction_client.predict(name, payload, params)
    return request  # waits till request is returned

if __name__ == '__main__':
    file_path = sys.argv[1]
    project_id = sys.argv[2]
    model_id = sys.argv[3]

    with open(file_path, 'rb') as ff:
        _content =

    print(get_prediction(_content, project_id,  model_id))

And the results are…

Results M3
Results M3

On point! Well, it is an M3, but that is still a 3 series BMW.

Next, remember my old Z4?


payload {
classification {
    score: 0.999970555305
    display_name: "bmwz4"

Yes, sir! That is, in fact, a Z4.


Now, what did we learn?

First off, using the Cloud Vision API simplifies things tremendously for the overwhelming majority of use cases. It gives you a very accurate output for most standard scenarios, such as detected images not appropriate for your user base (for filtering user-generated content) or for classifying and detecting many factors in an image.

However, when the task becomes too specific, AutoML helps us to build our custom model without having to deal with the intricacies of a custom TensorFlow model. All we need to take care of is good training data and careful labeling before training the model. The simple REST API can be used just like the Cloud Vision API in your custom software.

I don’t know about you, but I’m a big fan – we managed to build a system that would otherwise require a lot of very smart Data Scientists. Granted, it will not achieve the accuracy a good Data Scientists can (on AutoML or not) – these folks know more than I do and can figure out model issues that I cannot; however, this is the key point. Any skilled Engineer with a basic understanding of ML can implement this system and advance your project with a custom ML model. Neato!

All development was done under Arch Linux on Kernel 4.18.12 with 16 AMD Ryzen 1700 vCores @ 3.6Ghz and 32GiB RAM

Continue Reading

Analyzing Reddit’s Top Posts & Images With Google Cloud (Part 1)

Entity Wordcloud

In this article (and its successors), we will use a fully serverless Cloud solution, based on Google Cloud, to analyze the top Reddit posts of the 100 most popular subreddits. We will be looking at images, text, questions, and metadata.

We aim to answer the following questions:

  • What are the top posts?
  • What is the content of the top posts? What types of images are popular?
  • When is the best time to post to reddit?
  • Is “99% of the Karma” really in the hand of the “top 1% of posters”?

This will be the first part of multiple; we will be focussing on the data processing pipeline and run some exemplary analysis on certain image-based subreddits using the Cloud Vision API.

The source code for this is available on GitHub under the GNU General Public License v3.0.

What is Reddit?

Reddit is a social network where people post pictures of cats and collect imaginary points, so-called “upvotes”.

Reddit (/ˈrɛdɪt/, stylized in its logo as reddit) is an American social news aggregation, web content rating, and discussion website. Registered members submit content to the site such as links, text posts, and images, which are then voted up or down by other members. Posts are organized by subject into user-created boards called “subreddits”, which cover a variety of topics including news, science, movies, video games, music, books, fitness, food, and image-sharing. Submissions with more up-votes appear towards the top of their subreddit and, if they receive enough votes, ultimately on the site’s front page.”


Reddit is the 3rd most popular site in the US and provides a wonderful basis for a lot of interesting, user-generated data.

Technology & Architecture

We will be using the following technologies:

  • Python 2.7.3
  • Cloud Dataflow / Apache Beam
  • BigQuery
  • Google Cloud Storage (GCS)
  • Cloud ML / Vision API
  • Cloud Datalab

Resulting in the following architecture –


Compute Engine or Cloud shell are used to run the data gathering Python script and stores the data to Cloud Storage.

Dataflow and Cloud Vision API will be used to process the data and store it to BigQuery.

DataLab will be used to analyze & visualize the data.

Gathering Data

For gathering the initial data, we will use a simple Python script using the reddit praw library. You can run this from your Google Cloud Shell or your local desktop (or a Compute Engine instance).

This code will do the following:

  • Pull the “top” posts of all time from the 100 most popular subreddits, up to a limit you define (I took 1,000 for this article)
  • Detect the type of post:
    • Self – text
    • Question – simply a title (like in /r/askreddit)
    • extMedia – external media (images, videos)
    • Link – external links, e.g. to blog posts
  • Add a unique ID to the post by MD5-hashing the title and timestamp
  • Store the result as JSON, split by subreddit
  • Upload the JSON to GCS
class DictEncoder(json.JSONEncoder):
    def default(self, obj):
        return obj.__dict__

class Post:
    def __init__(self, title, subreddit, author, upvotes, date_iso, link, type, num_comments, content): = hashlib.md5((title + str(date_iso)).encode('utf-8')).hexdigest()
        self.title = title
        self.subreddit = subreddit = author
        self.upvotes = upvotes
        self.date_iso = int(date_iso) = link
        self.type = type
        self.num_comments = num_comments
        self.content = content

    def __str__(self):
        return "{title}, upvotes: {up}, date: {date}, link: {link}, content: {content}".format(
            date=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.date_iso)).encode('utf8'),

def get_top_posts(subreddit, reddit, limit):
    # Store posts
    posts = []

    for submission in reddit.subreddit(subreddit).top(limit=limit):
        if submission.pinned:

            if submission.is_self and submission.selftext is not None:
                # Self post - text
                content = submission.selftext
                _type = 'self'
            elif submission.is_self and submission.selftext is None:
                # Self post - no header - askreddit etc.
                content = submission.title
                _type = 'question'
            elif submission.url is not None and submission.preview is not None and submission.preview.__len__ > 0 \
                    and 'images' in submission.preview and submission.preview['images'].__len__ > 0:
                # External media - store preview if available
                content = submission.preview['images'][0].get('source').get('url')
                _type = 'extMedia'
            elif submission.url is not None and is not None:
                # External media
                content = submission.url
                _type = 'extMedia'
            elif submission.url is not None and is None:
                # External link
                if 'imgur' in submission.url or '.jpg' in submission.url or '.png' in submission.url or '.gif' in submission.url:
                    _type = 'extMedia'
                    _type = 'link'
                content = submission.url
                # Empty post
                content = None
                _type = 'none'

            post = Post(submission.title, submission.subreddit_name_prefixed,,,
                        submission.created, submission.permalink,
                        _type, submission.num_comments, content)
            print("subreddit: {subreddit}".format(subreddit=submission.subreddit_name_prefixed))
        except Exception as e:

        # Honor fair use terms - 60 requests per minute

    return posts

def write_json_gcp(_input=config.creddit['file'], _output=config.cgcp['file'], bucket_name=config.cgcp['bucket']):
    from import storage
    # Instantiates a client
    storage_client = storage.Client()

    # Gets bucket
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(_output)

    # Upload
    print('Uploaded {} to {} in bucket {}'.format(_input, _output, bucket_name))

def main():
    # Get reddit instance
    reddit = praw.Reddit(client_id=config.creddit['client_id'],
    # Set GCP path
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.cgcp['api_key']
    LIMIT = config.limit

    # Define top subreddits
    csv = 'subreddit|type|title|upvotes|num_comments|content|author|date\n'
    subreddits = ['announcements', 'funny']
    # (Full list on GitHub)
    posts = []
    flat_json = ''
    # Enable for debugging
    #subreddits = ['pics']

    for subreddit in subreddits:
        flat_json = ''
            top_posts = get_top_posts(subreddit, reddit, LIMIT)
            posts = posts + top_posts

            for post in top_posts:
                flat_json += json.dumps(post.__dict__) + '\n'

            if config.use_json_array == 'true':
                # Write back Json as array
                with open(subreddit + config.creddit['file'], 'a') as file:
                    file.write(json.dumps([ob.__dict__ for ob in posts]))
                # Write back JSON one line at a time for DataFlow
                with open(subreddit + config.creddit['file'], 'a') as file:

            write_json_gcp(subreddit + config.creddit['file'], subreddit + config.creddit['file'])
        except Exception as e:
            print('Encountered error, skipping record')

if __name__ == "__main__":

The resulting JSON has the following structure:

  "date_iso": 1515704703,
  "author": "unknown_human",
  "num_comments": 4109,
  "title": "Meeting Keanu Reeves at a traffic light",
  "subreddit": "r/pics",
  "content": "",
  "link": "/r/pics/comments/7pnxv2/meeting_keanu_reeves_at_a_traffic_light/",
  "upvotes": 200378,
  "type": "extMedia",
  "id": "4f6541b8b9b98e26346a228312a1b662"

The reason we don’t run this through a distributed system is reddit’s “Fair Use” API policy, which limits us to 1 API call per second, therefore rendering high-performance computing fairly pointless.

Cloud Vision API

Before we dive into the data processing on the cloud, let’s quickly talk about image recognition.

Google’s Cloud Vision API is a powerful tool to quickly analyze an image’s content and detect its relevant features and relative importance within the image.

It abstracts the actual machine learning models from the user and makes it a fantastic tool to integrate in any data processing pipeline, as it doesn’t require you to actually figure out your own model or to train it. While CloudML does enable you to figure all this out with, say, TensorFlow, chances are that a lot of use cases will not require that level of effort.

Take this example of a picture of an otter I took in the Atlanta Zoo the other day and review the labels the Cloud Vision API returned:

Cloud Vision Example 1
Cloud Vision Example 1

These are the labels the vision API detected –

  "labelAnnotations": [
      "mid": "/m/035qhg",
      "description": "fauna",
      "score": 0.9405464,
      "topicality": 0.9405464
      "mid": "/m/089v3",
      "description": "zoo",
      "score": 0.8177689,
      "topicality": 0.8177689
   ...  ]

As you can see, the seemingly main content of the image, the animal, has a relatively low score, as it is only a small part of the image. It did, however, interpolate the fact that this was taken in the zoo (as opposed to in the wild) based off the image’s other features, such as the artificial riverbed.

Now, compare it to another picture of two otters I took in a different Zoo in the UK a couple of years ago:

Cloud Vision Example 2
Cloud Vision Example 2

Here, we can clearly see that the model correctly identified the content, but got into a much higher level of detail, given the absence of noise from the surroundings.

Taking this into account, we need to keep a couple of things in mind about our data:

  • We need to filter out low probabilities
  • We need to ensure not to consider too generic terms, for instance “meal” when looking at images from /r/food

You can try out the Cloud VIsion API here:

Introducing Data Flow

In the next step, we utilize Cloud Data Flow to process the data further.

Dataflow is based on the Apache Beam API and is an auto-scaling data-processing framework. It follows a fairly simple programming model in either Python or Java, relying on immutable, distributed collections (PCollections) and functions that get applied to one line of an input file at a time.

Dataflow is fully managed (serveless) and auto-scales to more processing nodes when required.

Similar to Apache Spark, Beam can use the same code for streaming and batch data. You can also run it on e.g. Flink or Spark, but for the sake of having a serverless architecture, we will focus on Data Flow.

For more details, I will refer you to the official Apache Beam documentation.

The Data Flow Pipeline

Data Flow Pipeline
Data Flow Pipeline

We will be doing the following processing steps:

  • Read the JSON, decode it
  • Split records with images (type extMedia)
  • Get the image*
  • Apply the VisionAPI
  • Store the results (image VisionAPI output and posts) to BigQuery in two separate tables


First off, we read the initial JSON and decode it to a Python dict. The current example reads one JSON at a time; you could also read multiple files.

    with beam.Pipeline(options=pipeline_options) as p:
        records = (
            p |
            ReadFromText(known_args.input, coder=JsonCoder()) |
            'Splitting records' >> beam.ParDo(Split())

The code also splits the inputs by their type tag to identify images.

class Split(beam.DoFn):
    def process(self, record):

        _type = record['type']
        if _type == 'self' or _type == 'link':
            return [{
                'post': record,
                'image': None
        elif _type == 'extMedia':
            return [{
                'post': record,
                'image': record['content']
            return None

Next, we get the image from the data, store it to GCS, apply the VisionAPI, and finally return another dict for our images table. We resize the image to ensure we don’t hit the Vision API’s 10MiB file limit per request.

    def process(self, record):'Image: ' + record['image'])
        tmpuri = self.tmp_image_loc + record['post']['id'] + '.jpg'
        # Download the image, upload to GCS
        urllib.urlretrieve(record['image'], tmpuri)
        self.write_gcp(tmpuri, self.outputloc + record['post']['id'] + '.jpg', self.bucket)
        labels = self.get_vision(tmpuri, record['post']['id'])

The resulting data contains the unique ID of the post, the subreddit, and label, and its specific topicality (the relevancy of the detected feature in the image) and its score.

Lastly, we write the results back to BigQuery:

            posts | 'Write to BQ' >>

In order to run this pipeline, upload the file to Cloud Storage, download it to Cloud Shell and execute the following script:

python -m DataFlowReddit \
  --project ${PROJECT} \
  --runner DataflowRunner \
  --input ${INPUT} \
  --temp_location gs://${BUCKET}/tmp/ \
  --bucket ${BUCKET} \
  --staging_location gs://${BUCKET}/stg/ \
  --tmp /tmp/ \
  --useBigQuery true \
  --output reddit.posts \
  --imgOutput reddit.images \
  --requirements_file requirements.txt \
  --max_num_workers 24

During execution, you can always launch StackDriver to analyze the logs for any failures or progress:


* As a disclaimer – it is generally speaking not a brilliant idea to run a very expensive operation – like multiple REST calls – on a pipeline. For the sake of simplicity, we will stick to the idea.

A first look at the data

To take a look at the resulting data, we run a simple BigQuery query on the Web UI to take a look at the post prominent features of some hand-picked subreddit, for instance, /r/pics, where people post pictures of all sorts of things. Keep in mind, the BigQuery Preview feature is free.

Big Query Example
Big Query Example

We can see an overview of posts, upvotes, comments, the content (in this case, a link to the image), and the id. We will use this data to process the data further in DataLab.

Analyzing the data with DataLab

For analyzing the data, we use DataLab. DataLab is pretty much comparable to Apache Zeppelin – a live, web-based notebook that enables us to analyze data and visualize it in notebooks that can be updated lived and easily shared, based on Jupyter.

It exposes all Google Cloud components in a simple Python environment where we can work on our data with Pandas Dataframes. If you are familiar with Apache Spark, this will come naturally.

In order to get our data, we use the %%bq directive in DataLab to store our query results form BigQuery into a variable. We then expose it to a Pandas Dataframe to take a look at the top results – you can run further data processing here.

%%bq query --name pics
id,upvotes,num_comments,title, CAST(TIMESTAMP_SECONDS(date_iso) AS DATETIME) AS dt
FROM `reddit.posts`
WHERE lower(subreddit) = 'r/pics'
ORDER BY dt desc
LIMIT 1000; 
%%bq query --name picsImg
SELECT description,count(*) as c,sum(score) as score FROM `reddit.images`
where (lower(subreddit) like '%pics') and score>0.7 
group by description
order by c desc
LIMIT 1000
import pandas as pd
import as storage
from google.datalab import Context
import google.datalab.bigquery as bq
import pandas as pd
from io import BytesIO

# Variables
project = Context.default().project_id
# Get dataframe
df_data = pics.execute(output_options=bq.QueryOutput.dataframe()).result()

df_img_data = picsImg.execute(output_options=bq.QueryOutput.dataframe()).result()
df_img_data_25 = df_img_data.head(25)


Next, we plot our results using our Pandas DataFrame from before.

df_plot_data = df_img_data_25[['description','c']]
ax = df_plot_data.plot(kind='bar',x='description',title='Top image labels')

Top labels in /r/pics
Top labels in /r/pics

As we can see, apparently having a lot of (seemingly beautiful) sky in your pictures gets you far. Trees, water or girls seem to help as well.

We can also take a look at another popular subreddit in combination with /r/pics. As the unnamed subreddit (for SEO reasons…) is focussed on nature, we get a much broader result concerning “nature” related labels –

Popular Nature Labels
Popular Nature Labels

Finally, let’s look at how the Vision API labeled one of the top posts, “Almost slept through this amazing sunrise at Monument valley, was glad I went out anyway! USA (OC)[1920×1920]” by /u/crpytodesign with 30k+ upvotes and ID ae99d5a9e877f9ce84087516f1170c70.

By simply running another %%bq directive, we can get the labels:

Example Image Labels
Example Image Labels

Last but not least, let’s generate a nice and simple wordcloud on DataLab using the wordcloud library:

Entity Wordcloud
Entity Wordcloud

Granted, this is not exactly an in-depth analysis – but it does illustrate the point on how to work with the data we gathered.

A word about cost

Google Cloud bills you based on usage and the services we are using scale automatically. While certain services are offered free of charge, it is still important to understand that the cost of the deployed solution will be based on your job’s efficiency and data volume.

Dataflow, for instance, has the following cost structure based (as of June 2018):

While you can control the maximum number of workers, an inefficient job will rack up costs fairly quickly.

BigQuery’s pricing is based on the data that your query processes –

Which results in an inefficient query that has to read a huge set of data will result in an equally huge bill. Keep in mind that using LIMIT operations will not affect this, at it depends on the columns and resulting data volume that your query processes.

A similar concept applies to the ML APIs – just take a look at my billing data at the time of writing this article:

Vision API Pricing
Vision API Pricing


While this exercise was simple in nature, it did illustrate certain key concepts –

  • How to utilize a fully-managed Cloud environment on Google Cloud
  • How to gather data and use Cloud Storage as a location for further processing
  • How to use Cloud Dataflow to process data without having to worry about scaling nodes and even be prepared for a streaming application
  • How to simply integrate powerful Machine Learning models
  • How to use resources on-demand
  • How to analyze data with simple notebooks and even chart the data

If we compare this effort to a “traditional” Hadoop approach, we stop some major differences and advantages –

  • Simple, non-demanding development environment
  • Existing tools and frameworks most of us are familiar with or at least can get familiar with very quickly (Python, Apache Beam <-> Spark/Storm…, DataLab <-> Zeppelin/Jupyter, BigQuery <-> Hive/Impala etc.)
  • Barely any effort to manage environments, nodes, scaling, high-availability and other administrative tasks
  • High throughput and performance without big optimization work

We also noticed some drawbacks and caveats to the approach –

  • Google Cloud is billed on-demand – while it does prove to lower the overall TCO of a data environment, it is easy to accidentally run expensive queries, start too many workers, or to rack up a high bill by being careless
  • We do lack the full control a completely Open Source solution might provide, given enough developer resources

Regardless of this – and other, more enterprise-level considerations – Google Cloud provided a great backend for a solution which would have been undoubtedly more complicated to realize using traditional Hadoop methodologies and tools.

In the next iteration of the article, we will run some more in-depth analysis and run a similar pipeline on other types of posts, such as text-based submissions.

All development was done under Fedora 27 4.16.13-200.fc27.x86_64 with 16 AMD Ryzen 1700 vCores @ 3.6Ghz and 32GiB RAM

Continue Reading

Analyzing Twitter Location Data with Heron, Machine Learning, Google’s NLP, and BigQuery


In this article, we will use Heron, the distributed stream processing and analytics engine from Twitter, together with Google’s NLP toolkit, Nominatim and some Machine Learning as well as Google’s BigTable, BigQuery, and Data Studio to plot Twitter user’s assumed location across the US.

We will show how much your Twitter profile actually tells someone about you, how it is possible to map your opinions and sentiments to parts of the country without having the location enabled on the Twitter app, and how Google’s Cloud can help us achieve this.

About language, locations, social media, and privacy

While it is safe to assume that most Twitter users do not enable the Location Services while using the Social network, we can also assume that a lot of people still willingly disclose their location – or at least something resembling a location – on their public Twitter profile.

Furthermore, Twitter (for the most part) is a public network – and a user’s opinion (subtle or bold) can be used for various Data Mining techniques, most of which do disclose more than meets the eye.

Putting this together with the vast advances in publicly available, easy-to-use cloud-driven solutions for Natural Language Processing (NLP) and Machine Learning (ML) from the likes of Google, Amazon or Microsoft, any company or engineer with the wish to tap this data has more powerful tool sets and their disposal than ever before.

Scope of the Project

For this article, we will write a Heron Topology that does the following –

  • Read from Twitter, given certain seed keywords, filtering out users that do not disclose any location information, either as metadata or profile data
  • Use the Google NLP service to analyze the tweets and a user’s location data
  • Use Nominatim (based on OpenStreetMap data) to apply reverse-geocoding on the results
  • Use the DBSCAN cluster with a Haversine distance metric to cluster our results
  • Write the results back to Google BigTable or BigQuery

Then, we’ll visualize the results with Cloud Studio.


The architecture for this process is fairly simple:

Architecture (simplified)

Heron serves as our Stream processing engine and local ML, Nominatim on Postgres serves as Geo-Decoder.

On Google Cloud, we use the NLP API to enrich data, BigTable and BigQuery for storage and Data Studio for visualization.

BigTable (think HBase) is used for simple, inexpensive mass-inserts, while BigQuery is used for analytics. For the sake of simplicity, I’ll refer to one of my old articles which explains quite a bit about when to use BigTable/Hbase and when not to.

Hybrid Cloud

While the notion of “Hybrid Cloud” warrants its own article, allow me to give you an introduction what this means in this context.

For this article, I heavily utilized the Google Cloud stack. The Google NLP API provides me simple access to NLP libraries, without extensive research or complex libraries and training sets.

Google BigTable and BigQuery provide two serverless, powerful data storage solutions that can be easily implemented in your programming language of choice – BigTable simply uses the Apache HBase Interface.

Google Data Studio can access those Cloud-based sources and visualize them similar to what e.g. Tableau can achieve, without the need to worry about the complexity and up-front cost that come with such tools (which doesn’t imply Data Studo can do all the things Tableau can).

At the same time, my Nominatim instance as well as my Heron Cluster still run on my local development machine. In this case, the reason is simply cost – setting up multiple Compute Engine and/or Kubernetes instances simply quickly exceeds any reasonable expense for a little bit of free-time research.

When we translate this into “business” terminology – we have a “legacy” system which is heavily integrated in the business infrastructure and the capital expense to move to a different technology does not warrant the overall lower TCO. Or something along those lines…

The following section describes the implementation of the solution.

Reading from Twitter

First off, we are getting data from Twitter. We use the twitter4j library for this. A Heron Spout consumes the data and pushes it down the line. We use a set of keywords defined in the to consume an arbitrary topic.

Here, we ensure that a tweet contains location information, either from Twitter or via the user’s own profile.

if (location != null) {
    Document locDoc = Document.newBuilder()

    List<Entity> locEntitiesList = language.analyzeEntities(locDoc, EncodingType.UTF16).getEntitiesList();
    String locQuery = getLocQueryFromNlp(locEntitiesList);

    // Build query
    NominatimSearchRequest nsr = nomatimHelper.getNominatimSearchRequest(locQuery);

    // Add the results to a query, if accurate
    JsonNominatimClient client = nomatimHelper.getClient();
    ArrayList<String> reverseLookupIds = new ArrayList<>();
    List<Address> addresses = new ArrayList<>();
    if (!locQuery.isEmpty()) {
        addresses =;
        for (Address ad : addresses) {
            logger.debug("Place: {}, lat: {}, long: {}", ad.getDisplayName(),

            Location loc = LocationUtil.addressToLocation(ad);

            // Filter out points that are not actual data points
            String osmType = ad.getOsmType();

            if (osmType != null && (osmType.equals("node") || osmType.equals("relation") ||
                    osmType.equals("administrative")) && loc.isWithinUSA()) {

You could re-route users that do not use location data to an alternate pipeline and still analyze their tweets for sentiments and entries.

Understanding Twitter locations

The next bolt applies the Google Natural Language Toolkit on two parts of the tweet: It’s content and the location.

For an example, let’s use my own tweet about the pointless “smart” watch I bought last year. If we analyze the tweet’s text, we get the following result:

(Granted, this isn’t the best example – I simply don’t tweet that much)

For this article, we won’t focus too much on the actual content. While it is a fascinating topic in itself, we will focus on the location of the user for now.

When it comes to that, things become a little more intriguing. When people submit tweets, they have the option to add a GPS location to their tweet – but unless you enable it, this information simply returns null. Null Island might be a thing, but not a useful one.

However, we can also assume that many users use their profile page to tell others something about themselves – including their location. As this data gets exposed by Twitter’s API, we get something like this:

While a human as well as any computer can easily understand my profile – it means Atlanta, Georgia, United States Of America, insignificant little blue-green planet (whose ape-descended lifeforms are so amazingly primitive that they still think digital smart watches are a great idea), Solar System, Milky Way, Local Group, Virgo Supercluster, The Universe – it’s not so easy for the more obscure addresses people put in their profile.

A random selection –

  • What used to be Earth
  • Oregon, (upper left)
  • in a free country
  • MIL-WI
  • Oban, Scotland UK 🎬🎥🎥🎬
  • United States
  • your moms house
  • Between Kentucky, Ohio
  • Savannah Ga.
  • Providece Texas A@M Universty
  • USA . Married over 50 yrs to HS Sweetheart
  • 24hrs on d street hustling

(All typos and emojis – the tragedy of the Unicode standard – were copied “as is”)

In case you are wondering, “your moms house” is in Beech Grove, IN.

The sample set I took most of this data from had only 25 entries and were parsed by hand. Out of those, 16 used ANSI standard INCITS 38:2009 for the state names, but in various formats.

A whole 24% used what I called “other” – like “your moms house”.

On a bigger scale, out of a sample set of 22,800 imported tweets, 15,630 (68%) had some type of location in their profile, but only 11 had their actual GPS location enabled.

Points scored

For the time, we can conclude that most Twitter users tell us something about their location – intentional or not. For a more scientific approach, here’s a link – keep in mind that my data is a random selection.

However – using user-entered data always results in messy, fuzzy, non-structured data. This has been a problem way before the terms “Machine Learning” or “Analytics” exceeded any marketing company’s wildest dreams. Levenshtein-Distance record matching, anyone?

Using NLP to identity entities & locations

At this point, Google’s NLP toolkit comes into play again. We use the NLPT to get all locations from the user’s self-entered “place” to identify everything that has the LOCATION metadata flag.

This is simple for something like this:

“Oregon” is clearly the location we need. We were able to strip of “upper left” – and could even weigh this based on the specific salience value.

However, more obscure queries result in more random results:

But even here, we get the core of the statement – USA – with a high confidence level.

The more random place descriptions (such as “a free country”) naturally only produce low-quality results – which is why we should filter results with a low salience score. While this only means “relative to this set of text, this entity is relatively important/unimportant”, it does serve as a rough filter.

In order to use more standardized data, we can also use the wikipedia_url property of the NLP toolkit (if available) and extract a more useful string. This results in “Baltimore, MD” to be turned into “Baltimore Maryland”, for instance.

However, “Atlanta” turns into “Atlanta (TV Series)” – so use it with caution.

public static List<Cluster<Location>> dbscanWithHaversine(ArrayList<Location> input) {
    DBSCANClusterer<Location> clusterer = new DBSCANClusterer<>(EPS, MIN_POINTS, new HaversineDistance());
    return clusterer.cluster(input);
public class HaversineDistance implements DistanceMeasure {
    public double compute(double[] doubles, double[] doubles1) throws DimensionMismatchException {
        if (doubles.length != 2 || doubles1.length != 2)
            throw new DimensionMismatchException(doubles.length, doubles1.length);

        Location l1 = new Location("A", doubles[0], doubles[1],0,"N/A");
        Location l2 = new Location("B", doubles1[0], doubles1[1],0,"N/A");
        return MathHelper.getHaversineDistance(l1, l2);
public static double getHaversineDistance(Location loc1, Location loc2) {
    Double latDistance = toRad(loc2.getLatitude() - loc1.getLatitude());
    Double lonDistance = toRad(loc2.getLongitude() - loc1.getLongitude());
    Double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) +
            Math.cos(toRad(loc1.getLatitude())) * Math.cos(toRad(loc1.getLatitude())) *
                    Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
    Double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
    return R * c;

Reverse Geocoding & Clustering indecisive answers

The next step, assuming we received unstructured location data, we will try to convert that data into a query for a location service that supports reverse geocoding. Location services and their respective APIs are plentiful online – as long as it is able to convert a query of location data into a set of potential matches, we will be able to use that data.

In this case, we are using the Nominatim client from OpenStreetMap. Given the data volume, it is advisable to host Nominatim on Kubernetes or your local development machine – the openstreetmap servers will block your IP if you accidentally DDos them or simply don’t respect their Fair Use Policy – and the velocity of streaming tends to violate basically every Fair Use clause in existence.

OpenStreetMap will return a list of potential matches. Take this example when our location is “Springfield” and we limit the results to 20:

Springfield data points

As you can see, this is not conclusive. So we need to find a way to figure out which result is most accurate.

Fun Fact: Without a country boundary on the US with Nominatin, this is what “Detroit Michigan” produces:

Using clustering to approximate locations

In order to figure out where on the map our result is, we use Density-based spatial clustering of applications with noise (DBSCAN), a clustering algorithm that maps points by their density and also takes care of any outliers.

DBSCAN illustration

I found this article’s description of the algorithm most conclusive. Short version – for a dataset of n-dimensional data points, a n-dimensional sphere with the radius ɛ is defined as well as the data-points within that sphere. If the points in the sphere are > a defined number of min_points, a cluster is defined. For all points except the cener, the same logic is applied recursively.

As DBSCAN requires the ɛ parameter to be set to the maximum distance between two points for them to be considered as in the same neighborhood. In order to set this parameter to a meaningful value, we use the Haversine distance to get the orthodromic distance on a sphere – in our case, a rough approximation of the earth and therefore a result in kilometeres between locations.

The Haversine function is defined as such –


  • d is the distance between the two points (along a great circle of the sphere; see spherical distance),
  • r is the radius of the sphere,
  • φ1, φ2: latitude of point 1 and latitude of point 2, in radians
  • λ1, λ2: longitude of point 1 and longitude of point 2, in radians

In our case , r is defined as the radius of the earth, 6,371 km.

To combine those, we can use the package. All we need to do is implement the DistanceMeasure interface (as well as the function for the Haversine distance itself).

public static List<Cluster<Location>> dbscanWithHaversine(ArrayList<Location> input) {
    DBSCANClusterer<Location> clusterer = new DBSCANClusterer<>(EPS, MIN_POINTS, new HaversineDistance());
    return clusterer.cluster(input);
public class HaversineDistance implements DistanceMeasure {
    public double compute(double[] doubles, double[] doubles1) throws DimensionMismatchException {
        if (doubles.length != 2 || doubles1.length != 2)
            throw new DimensionMismatchException(doubles.length, doubles1.length);

        Location l1 = new Location("A", doubles[0], doubles[1],0,"N/A");
        Location l2 = new Location("B", doubles1[0], doubles1[1],0,"N/A");
        return MathHelper.getHaversineDistance(l1, l2);
public static double getHaversineDistance(Location loc1, Location loc2) {
    Double latDistance = toRad(loc2.getLatitude() - loc1.getLatitude());
    Double lonDistance = toRad(loc2.getLongitude() - loc1.getLongitude());
    Double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) +
            Math.cos(toRad(loc1.getLatitude())) * Math.cos(toRad(loc1.getLatitude())) *
                    Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
    Double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
    return R * c;

By putting 2 and 2 together, we get a DBSCAN implementation that accepts an ɛ in kilometers.

While choosing the parameters for the DBSCAN algorithm can be tricky, we use 150km / 93mi as the max. Radius of a cluster and assume that a single point is valid for a cluster. While this, in theory, produces a lot of noise, it is an accurate statement for clustering our location set.

For interpreting our clusters (and choosing the one that is seemingly correct), we rely on the average “importance” value of OpenStreetMap, which aggregates multiple important metrics (from e.g., Wikipedia) to “score” a place.

If the user’s location contains part of a state as String (e.g., Springfield, IL), we increase the importance score during execution.

Springfield, data points


Springfield, avg. importance by cluster

In our Springfield example, Springfield, IL is the state capital of Illinois – and that is one of the reasons why the OpenStreetMap data ranks it higher than the entire Springfield, MA cluster (which consists of Springfield, MA, Springfield, VT and Springfield, NH – despite their combined population being bigger than Illinois’ state capitol).

Assuming we have multiple points in a cluster, we take the average between all coordinates in it. This is accurate enough in a ~90mi radius and results in the rough center of the (irregular) polygon that is our cluster.

While I’ve just explained that Springfield, IL could be considered an accurate result, in order to illustrate the averaging, we simply remove Springfield, IL from the equation and our “best” cluster looks like this:

Sample Cluster for Springfield, MA

(The red dot in the middle is the average coordinate result)

Finally, we retrofit the calculated location data to a US state. For this, we have 2 options –

  • Call Nominatim again, resulting in another relatively expensive API call
  • Approximate the result by using a local list of rough state boundaries

While both methods have their pros and cons, using a geo provider undoubtedly will produce more accurate results, especially in cities like NYC or Washington DC, were we have to deal with close state borders to any given point.

For the sake of simplicity and resource constraints, I’m using a singleton implementation of a GSON class that reads a list of US states with rough boundaries that I’ve mapped from XML to JSON.

In our case, the result is either New Hampshire or Illinois, depending if we remove Springfield, IL nor not.

Other examples

Now, what tells us that somebody who states they are from “Springfield” simply likes the Simpsons?

Well, nothing. While it is advisable to store multiple potential location results and re-visit that data (or even use a different algorithm with a proper training set based on that), the architecture and algorithms works surprisingly well – some random profiling produced mostly accurate results, despite various input formats:

Sample Tweet locations across the US by quantity

(The big dot in the middle represents the location “USA”)

Original Location Result Accurate
Philadelphia Philadelphia, Philadelphia County, Pennsylvania, United States of America TRUE
Brooklyn, NY BK, Kings County, NYC, New York, 11226, United States of America TRUE
nebraska Nebraska, United States of America TRUE
United States United States of America TRUE
Truckee, CA Truckee, Donner Pass Road, Truckee, Nevada County, California, 96160, United States of America TRUE
Lafayette, LA Lafayette, Tippecanoe County, Indiana, United States of America TRUE
Minot, North Dakota Minot, Ward County, North Dakota, United States of America TRUE
Rocky Mountain hey! Rocky Mountain, Harrisonburg, Rockingham County, Virginia, United States of America TRUE
Living BLUE in Red state AZ! Arizona, United States of America TRUE
Earth Two, Darkest Timeline Earth Tank Number Two, Fire Road 46, Socorro County, New Mexico, United States of America FALSE
The Golden State Golden, Jefferson County, Colorado, United States of America FALSE
Atlanta, GA Atlanta, Fulton County, Georgia, United States of America TRUE
thessaloniki Thessaloniki Jewelry, 31-32, Ditmars Boulevard, Steinway, Queens County, NYC, New York, 11105, United States of America FALSE
newcastle Newcastle, Placer County, California, 95658, United States of America TRUE
Afton, VA Afton, Lincoln County, Wyoming, 83110, United States of America FALSE
Gary, IN / Chicago, IL Chicago, Cook County, Illinois, United States of America TRUE
Canada Canada, Pike County, Kentucky, 41519, United States of America FALSE
Southern California Southern California Institute of Architecture, 960, East 3rd Street, Arts District, Little Tokyo Historic District, LA, Los Angeles County, California, 90013, United States of America TRUE
San Francisco Bay Area San Francisco Bay Area, SF, California, 94017, United States of America TRUE
Southern CA Southern California Institute of Architecture, 960, East 3rd Street, Arts District, Little Tokyo Historic District, LA, Los Angeles County, California, 90013, United States of America TRUE

(All of these results come with latitude and longitude, state data and the full user profile and tweet metadata)

More importantly, tuning those results is just an exercise in careful profiling. We could filter out obvious countries that are not the US, tune the model parameters or the API calls.

Tweet locations across the US by avg. importance

(In this example, big points indicate a high confidence level; often points in the geographical center of a state hint that the user simply said they were from e.g. “Arizona”, “AZ” or “Living BLUE in Red state AZ!”)


While the example shown here is a simple proof of concept, extending the concept has plenty of opportunities –

  • Fine-tune the model, filtering obvious outliers
  • Build a data model that connects location data with the tweets, all other available metadata
  • Store multiple salience values per analysis, tuning the model based on the data
  • Run the topology on scale and apply it to all tweets concerning a certain topic, analyzing the big picture and calculating for false positives
  • Run regression or other analysis over users entries with the same ID and potential mismatches, tracking changes in the location; write a pipeline that flags users which use their location and retrofit all old results to an accurate GPS
  • Store users without location data and apply the above logic to those

One thing we can conclude is that using a combination of well-known, powerful local Big Data tools in combination with managed, equally powerful Cloud solutions opens the door for a massive variety of new analytics opportunities that required a much higher level of involvement and cost only a few years ago.

The next steps will be to combine this data with the actual location results, create heatmaps, fine-tune the model, and eventually move the whole solution to the Google Cloud Platform.

Sample entity analysis

All development was done under Fedora 27 with 16 AMD Ryzen 1700 vCores @ 3.2Ghz and 32GiB RAM. Nominatim planet data from 2018-03-15 was stored on a 3TB WD RED Raid-1 Array

Software Stack: Heron 0.17.6, Google Cloud Language API 1.14.0, Google Cloud BigTable API 1.2.0, Google Cloud BigQuery API 0.32.0-beta, Google Data Studio, Nominatim 3.1.0, PostgreSQL 9.6.8

Continue Reading

Data Lakes: Some thoughts on Hadoop, Hive, HBase, and Spark

This article will talk about how organizations can make use of the wonderful thing that is commonly referred to as “Data Lake” – what constitutes a Data Lake, how probably should (and shouldn’t) use it to gather insights and why evaluating technologies is just as important as understanding your data.


When organizations talk about the need to utilize data as part of their IT and business strategy, they usually have certain goals in mind. A common question usually boils down to “How can we make use of the data that we have available within our organization?”

While it might seem like a simple enough question to solve, the devil’s in the detail.

  • Where do we store the data? How many systems are currently in use? How are they connected to our business processes?
  • Who knows about the data? Who understands that data? Do we have metadata available?
  • Which technologies are in use? Who are the people working with these technologies?
  • Which systems interact with my Data Lake? Where’s my MDM, CRM, ERP – and where’s Waldo? How do they work, what kind of data do they store and process?
  • What are my ETL requirements? Who can design our data models? Which systems are responsible for these tasks?
  • What regulations are impacting our data?

Given you are able to answer these questions, the next logical step might be to start a consolidation effort. What used to be the realm of traditional Data Warehouse solutions is now commonly replaced by what we call a “Data Lake” – meaning an organized, central storage for all your organization’s useful data in a scaleable, powerful repository, often realized by utilizing Hadoop or Cloud-based Big Data toolsets like Google BigQuery.

Now, let’s just assume you tackled all of these issues – your data is on your cluster, half a billion scheduled Sqoop jobs merrily import data, sanity checks, ETL, CDC, run like a charm, all regulations are taken care of, Kerberos is terrorizing IT departments across the globe and everything is living in one place. But… what now, exactly?

Utilizing your Data Lake

When it comes to actually using your Lake, things become really interesting. Usually, you would be utilizing one of the “standard” tools available with, say, Cloudera’s or Hortonwork’s respective distributions that enable you to access your data.

Just to pick a few –

  • Hive
  • HBase
  • Phoenix
  • Cassanadra
  • Accumulo
  • Impala
  • Kudu
  • Spark
  • Pig

Every single one of those tools has different use cases, different requirements for both technical and human resources and usually fits a single, usually fairly specific type of data access. So – how can we decide what tool fits best?

In the next sections, we will cherry-pick technologies and talk about Hive, HBase (plus a bit of Phoenix), and Spark and apply common use cases on a semi-large (9.6M records) public data set. At the end, we will summarize our findings and performance tests.

A short prefix for Apache Hive

When it comes to Hadoop, the transition is hard. One might argue that one of the, if not the, hardest thing about a transition from traditional DWH to Hadoop is not the technology, but rather the way people use it. That being said, the first thing that comes to mind when you transition from relational systems to Hadoop might just be Apache Hive – a mostly ANSI-SQL compliant tool that allows you to fire SQL queries on your HDFS data.

First off: Hive is a fantastic piece of software that comes with a lot of flexibility. It runs on your laptop (well, it does on mine…), on a legacy cluster powered by good old M/R2 just like it does on a modern, Spark or Tez fueled powerhouse. It makes it easy to work with any data you might have, from legacy CSVs or AS400 Sqoop imports to complex Avro-magic.

So, why not just use Hive like you would, say, a Microsoft SQL Server?

Because Hive. Is. Not. A. RDBMS. I cannot stress this enough. Hive uses Schema-On-Read mechanics, does not have the concept of keys (and that is, well, key!) and is not ACID compatible out of the box. You are also responsible for managing the underlying system. In the next section, we will dive into the details of what exactly that means.


We will use the following data set:

Data about the famous NYC cabs!

For 3 months, 2017-04 to 2017-06 for all Yellow Cabs, it boils down to a total of 29,805,311 records in 2.5GiB. If we start with June, we still get 813 MiB and 9,656,995 records.

As we will re-use the same data set for HBase later, you will need to make sure your HBase Key is unique – if you just use the first value in the CSV, VendorID, you will get a total of 3 records – as there are 3 distinct values in that CSV for this column. So we need to add an ID using awk. I suggest using head -1000 and piping that to a separate file for testing if you want to follow along.


awk -F',' -v OFS=',' '
 NR == 1 {print "ID", $0; next}
 {print (NR-1), $0}
' yellow_tripdata_2017-06.csv > yellow_tripdata_2017-06_id.csv


Now, we need to create a DDL and import the data:

,tpep_pickup_datetime TIMESTAMP
,tpep_dropoff_datetime TIMESTAMP
,passenger_count INT
,trip_distance DOUBLE
,RatecodeID STRING
,store_and_fwd_flag STRING
,PULocationID INT
,DOLocationID INT
,payment_type INT
,fare_amount DOUBLE
,extra DOUBLE
,mta_tax DOUBLE
,tip_amount DOUBLE
,tolls_amount DOUBLE
,improvement_surcharge DOUBLE
,total_amount DOUBLE

LOAD DATA INPATH 'hdfs:///user/cloudera/yellow_tripdata_2017-06_id.csv' OVERWRITE INTO TABLE nyc;

Let’s start of with ACID. Take the following example where we try to manipulate Hive Data:


Oops. Not good. Wait, let’s just enable transactions! However, you will have issues doing that using Cloudera’s distribution. Take this exempt from the documentation:

“The CDH distribution of Hive does not support transactions (HIVE-5317). Currently, transaction support in Hive is an experimental feature that only works with the ORC file format. Cloudera recommends using the Parquet file format, which works across many tools. Merge updates in Hive tables using existing functionality, including statements such as INSERT, INSERT OVERWRITE, and CREATE TABLE AS SELECT.” (

In the interest of transparency, here is how it looks on a manual Hive setup directly on Fedora (On a related note: Hortonwork’s HDP does support it out of the box):

Hive ACID Enabled

If you take a look at the official Hive documentation, the limitations become even more apparent:

  • Only ORC file format is supported in this first release.  The feature has been built such that transactions can be used by any storage format that can determine how updates or deletes apply to base records (basically, that has an explicit or implicit row id), but so far the integration work has only been done for ORC.
  • By default transactions are configured to be off.  See the Configuration section below for a discussion of which values need to be set to configure it.
  • Tables must be bucketed to make use of these features.  Tables in the same system not using transactions and ACID do not need to be bucketed. External tables cannot be made ACID tables since the changes on external tables are beyond the control of the compactor (HIVE-13175).
  • Reading/writing to an ACID table from a non-ACID session is not allowed. In other words, the Hive transaction manager must be set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager in order to work with ACID tables

While reasonable, it does strengthen my argument – Hive is not an RDMBS, even though you might be importing relational data. ACID compatibility is possible, but limited to some fairly specific use cases.

As long as you are using bucketed (often via a Primary Key from source), ORC-based, managed tables on a distribution that supports this, you are doing great. If you start with more uncommon Hadoop use cases, you might – will – run into issues.

So, how do organizations get around this? Usually, you can play The Tower Of Hanoi – meaning, using temporary tables and sub-selects to work with INSERT OVERWRITE statements, use a different storage and execution strategy that enables random writes and updates (more below) – or you can make this a core target and focus on matching the requirements outlined above.

When going for the first approach, there might be some….

Cannot insert into target table because column number/types are different

…errors and excluding columns still involves regex, but after all, a lot is possible if you put your mind to it.

In any case, sooner or later you will find yourself in need of working with your now accessible data. If you call this process “mapping”, “transformation” or just throw your entire ETL workload on Hadoop, eventually you will want to gain some form of insights from your Lake – and usually, this involves Joins, Views and other SQL operations.

Hive on 9,656,995 records

Let’s get back to our data. Always keep in mind that this data is currently stored as plain text.

Let’s try to get all trips with a tip over $100 – it’s NYC, after all:

with tmp as(
  WHERE tip_amount >= 100
SELECT COUNT(*), max(tip_amount), total_amount as max_tip from tmp group by total_amount;

Hive on 9M records

Time taken: 66.138 seconds, Fetched: 185 row(s)

We got a result in about a minute on a single node (cluster configuration can be found down below)!

As you can see, I use a less-than-shiny, white-on-black CLI. So, let’s assume we have business users and analysts which are used to complex, powerful tools such as MS SQL Management Studio.

We can provide access to business users using Ambari (Hortonworks), Hue, Zeppelin or other web-based front ends.


However – storing queries, simply exporting results, auto-complete, stored procedures, special characters and (in most cases) visualization are not features that are commonly found on these tools.

You will need an external tool, usually connected via JDBC, for that. Examples include Dbeaver, Squirrel or Dbvisualizer. Keep in mind that these tools need to talk to Hive (standard port 10000) and are usually Kerberos enabled – and generally speaking, Kerberos really, really hates nice things. It also hates bad things, in many cases, that would be Windows.

To get a bit more serious – transitioning your seasoned analysts to Hive is not an easy task. The technology is fundamentally different, the available tool sets do not offer the comfort many of us got used to over the years and seemingly simple functionality appears to be missing. It is really challenging to explain that Hive does, in fact, does not have the concept of “NULLable fields” or even Primary Keys, not to mention the highlighted ACID problematic or missing features from other SQL dialetcs.

However: Hive offers different benefits. It is heavily optimized for huge datasets and will not blow up for very large tables (the 9M records from our example might be a good start for a VM, but is not a representative volume!), can be easily extended by simply plugging more machines in your cluster, is fully Open Source, can directly work on various other data sources that are not managed by Hive (External Tables, see below), is integrated in basically any Big Data related technology (Sqoop even creates your tables and DDLs for you when importing data from an RDBMS), can be your entry point for Machine Learning on structured data, and serves as key component in any data lake.

So: You should use Hive. But do not expect it to work like MariaDB or Oracle. Do not expect setting up a Data Lake involving a couple of Hadoop Experts that know Hive in-and-out and to receive good feedback from business. As always, communication is key.

Use Hive to access your data, to transform your data (though not necessarily directly via the CLI), to provide external access via JDBC to end users (using the right tool – OpenSource or proprietary), and as major workhorse for your Data Lake for anything that even resembles relational data.

But remember – it is not an RDMBS. It was never meant to be that. It is not going to replace anything – it is a piece of technology that requires some careful planning and coordination between both IT and business. Maybe you even want to move your results to a different database, visualization tool or warehouse that works better with your Analyst’s toolset of choice – it all depends on what you are trying to do. But ultimately, you will be rewarded with a system which bridges the gap between abstract, distributed data and related algorithms such as MapReduce and the SQL we all learned to love and hate.

HBase and other non-relational databases

But obviously, there is a world beyond relational data. Enter the world of “noSQL”.

If you ever had the chance to use a production HBase cluster, there’s pretty much two scenarios. One: You love it because it was used exactly like it was meant to. Two: You hate everybody committing to it and burn Lars’ book on occasion (which, by the way, I can only highly recommend).

Non-relational systems, especially the Column-oriented varieties, like HBase, MongoDB, or Accumulo are a fantastic concept. Usually, there is no concept of ACID transactions, but are hellishly fast if used right.

If we use HBase, a de-facto standard on Hadoop, as an example, we can quickly see what “used right” means and apply some common access patterns on our NYC cab data. For details on HBase’s architecture, I will again refer to Lars George’s book, “HBase – the Definitive Guide” (O’Reilly Media).

In general terms, noSQL systems heavily rely on the concept of a key – and therefore, doing exactly what Hive omits for good reasons. Now, what is true about Hive – that it does not come with a fancy client – is especially true for HBase.

A seemingly simple query like “get me all records where X” turns into a long, horrible orgy of sub-commands over a charming black-and-white CLI.

We’ll re-use our NYC Yellow Cab data set (the Hive import from above moves the data set, so you will need to copy it to HDFS again).

hbase shell <<< "create 'nyc', 'cf'"

dos2unix yellow_tripdata_2017-06_id.csv

hdfs dfs -copyFromLocal yellow_tripdata_2017-06_id.csv /user/cloudera

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=, -Dimporttsv.columns="HBASE_ROW_KEY,cf:VendorId,cf:tpep_pickup_datetime,cf:tpep_dropoff_datetime,cf:passenger_count,cf:trip_distance,cf:RatecodeID,cf:store_and_fwd_flag,cf:PULocationID,cf:DOLocationID,cf:payment_type,cf:fare_amount,cf:extra,cf:mta_tax,cf:tip_amount,cf:tolls_amount,cf:improvement_surcharge,cf:total_amount" nyc /user/cloudera/yellow_tripdata_2017-06_id.csv

One might add that the initial import took plenty of time on the Quickstart Configuration, which, granted, is not very good. The command also exits with Bytes Written=0 if something is wrong (like a mismatch in the columns) – do not pass Go. Do not collect $200.

Again, let’s get all trips with a tip > $100.

scan 'nyc', {LIMIT => 10, FILTER =>'cf'), Bytes.toBytes('tip_amount'), org.apache.hadoop.hbase.filter.CompareFilter::CompareOp.valueOf('GREATER_OR_EQUAL'), Bytes.toBytes(100))}

What’s that?

HBase Query Result

What happened here? Using the TSV import command, all the values are imported as Strings – and we are trying to compare Strings with Bytes. Technically correct, but not very insightful. There is a whole lot more to the way HBase stores its data – I will not go into that at this point. But just keep in mind that HBase does not really care about the data you throw at it.

So, let’s use Java, the next low-level abstraction HBase offers. We implement a custom Comparator to work with Double in String fields, move that jar to HBase’s classpath, restart HBase and run our query as jar file. Make sure to set



package com.otterinasuit.hbase;

import com.otterinasuit.hbase.comparators.ByteToDoubleComparator;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.LongComparator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

// import ...

public class Main {
    private final static Logger logger = LoggerFactory.getLogger(Main.class);
    private final static double THRESHOLD = 100.00D;

    public static void main(String[] args) {

        try {
            long lStartTime = System.nanoTime();
            Configuration config = new Configuration();
            config.set("hbase.rpc.timeout", "1800000");
            config.set("", "1800000");

            Connection connection = ConnectionFactory.createConnection(config);
            Table table = connection.getTable(TableName.valueOf("nyc"));
            // Filter
            SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes
                    .toBytes("cf"), Bytes.toBytes("tip_amount"), CompareFilter.CompareOp.GREATER_OR_EQUAL,
                    new ByteToDoubleComparator(THRESHOLD));

            // Scan
            Scan scan = new Scan();

            // Run
            long c = 0;
            ResultScanner scanner = table.getScanner(scan);
            TreeMap<Double,result> list = new TreeMap<Double,result>();
            for (Result r : scanner) {
                String total = Bytes.toString(r.getValue(Bytes
                        .toBytes("cf"), Bytes.toBytes("tip_amount")));
                String tip = Bytes.toString(r.getValue(Bytes
                        .toBytes("cf"), Bytes.toBytes("total_amount")));
                System.out.println("tip_amount: " + tip);
                System.out.println("total_amount: " + total);

                double total_amount = Double.parseDouble(total);
                double tip_amount = Double.parseDouble(tip);
                list.put(tip_amount, new result(total_amount,tip_amount));

            // Calculate in the client
            double max_tip = Collections.max(list.keySet());
            result max_res = list.get(max_tip);
            System.out.println("Max tip was "+max_res.getTotal()+", amount was "+max_res.tip+", ratio: "+max_res.getRatio());
            System.out.println("A total of "+c+" people tipped over "+THRESHOLD);

            // Cleanup

            long lEndTime = System.nanoTime();
            long total = (lEndTime - lStartTime) / 1000000;
            System.out.println("Rumtime: " + total + "ms / "+total/1000/60+"min");
        } catch (Exception e) {
            logger.error("Reading from HBase failed!");

    static class result {
    // ...

Implementing the custom Comparator sounds like a simple exercise, but believe me, I had to read a lot of HBase source code in order to get that working. Also, Protobuf. We’ll forge that into a separate article.

Custom HBase Jar

Max tip was 440.0, amount was 473.3, ratio: 0.9296429326008874

A total of 185 people tipped over 100.0

Rumtime: 284765ms / 4.7min

About 5x the runtime of Hive. I need to add at this point that using a HBase client application is not a very efficient way of working with this data in itself and that I threw together that jar in a couple of minutes (which it why it screams “Kill me!”) – but since my test environment is running on a single node anyways, we will let this slide.

However, this comes back to my point about keys and hence data access patterns. We are using a monotonically increasing, artificial, meaningless key. It’s like assigning each record in a pile of loose vinyls a increasing number and expecting to find something quick. Sure, it works eventually – but not efficiently.

Let me demonstrate this by getting the (chosen-at-random) key “200000”:

HBase get

0.24s isn’t bad at all. Running a similar query in our (seemingly) quicker Hive table from above looks like this:

Hive HBase “get”

34.4s! In other words, 141x HBase’s runtime.

The same finding applies for scanning multiple rows – using

scan 'nyc', {STARTROW => '200000', ENDROW => '200020'}

to get all rows from 200,000 to 200,020 takes 1.9s in my example, the majority of what would be printing to the shell.

In real life, we would probably use a meaningful HBase key, such as the trip’s time if we are interested in analyzing the data based on date and time (on nanosecond precision in order to avoid duplicates). We could ask HBase “get me all trips from 2017-07-04 18:00:00.000 to 2017-07-05 01:59:59.000 and show me the average fare” if we want to take a look at 4th July fares at night.

Anyways – while HBase also answers my initial query, the Java method is highly inefficient as well:

  • The query runs client-side (as mentioned above)
  • We needed to essentially customize HBase or at least work around our data type issues (there’s other ways to achieve this, but your choice of technology will eventually limit your ability to gain these insights)
  • We need to write a custom program in Java
  • While it is almost easier than daisy-chaining commands in a shell, it is still fairly complex for such a simple query

SQL on noSQL

But what about SQL? Well, you can use Hive again by pointing an external table to HBase – I just don’t recommend it for this use case. First, we need an external table (a table that does not manage data, but rather just points to an existing data source), mapping our HBase columns to Hive columns:

,tpep_pickup_datetime TIMESTAMP
,tpep_dropoff_datetime TIMESTAMP
,passenger_count INT
,trip_distance DOUBLE
,RatecodeID STRING
,store_and_fwd_flag STRING
,PULocationID INT
,DOLocationID INT
,payment_type INT
,fare_amount DOUBLE
,extra DOUBLE
,mta_tax DOUBLE
,tip_amount DOUBLE
,tolls_amount DOUBLE
,improvement_surcharge DOUBLE
,total_amount DOUBLE
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = "
TBLPROPERTIES ('' = 'nyc');

The query certainly is easier to manage, but the performance is also not great either:

1       440.0   473.3

Time taken: 265.434 seconds, Fetched: 155 row(s)

4.4 minutes.

And the explanation remains the same: Column-based random-access is not what HBase was built for.

HBase is really fast on any access that involves a key, on a lot of inserts and even “updates” (which are really overwrites), can manage billions of rows, and is incredible flexible, as it does not dictate a fixed schema apart from ColumnFamilies. You can literally make up columns and data types as you go.

One alternative is Apache Phoenix, an SQL layer on top of HBase that takes care of a lot of optimization for you. I will not go into detail at this point – but the key finding (pun intended) remains the same. Using HBase while expecting random-read queries is a bad idea!

I guess what I am trying to say – usability is not HBase’s primary target. And it doesn’t have to be. As my “ID 200,000” example shows, HBase wipes the floor with Hive when it comes to very precise lookups, it’s flexibility due to the lack of schema is incredible and it will not let you down on very large data sets – if you make sure you know exactly what you are after.

Apache Spark

First off, Apache Spark is a general-purpose processing engine. Comparing it to other technologies is not really a good start, as it serves many purposes, such as –

  • Programming interface for data access, ETL, Machine Learning and more
  • Execution Engine (for Hive)
  • Streaming data
  • Machine Learning

I’ve used Spark many times and it remains one of my favorite, to-go tools in the Big Data zoo – mostly because it can do almost anything and is usually very fast, due to some very smart optimizations and techniques under the hood.

Without further ado, we will replace M/R2 from the Hive example above with the Scala-based Spark shell (basically, a live shell you can get by running $ spark-shell) and fire the same query:

def show_timing[T](proc: => T): T = {
  val start=System.nanoTime()
  val res = proc // call the code
  val end = System.nanoTime()
  println("Time elapsed: " + (end-start)/1000/1000 + " ms")

val sql = """
with tmp as(
WHERE tip_amount >= 100

SELECT COUNT(*), max(tip_amount), total_amount as max_tip from tmp group by total_amount"""



Time elapsed: 12420 ms

12s for the exact same result. Of course – we already spent time on Hive prior to this, but the same result with a slightly different performance outcome could be achieved by reading the CSV from HDFS directly! Using Spark 2 makes this easy, but we can use the default 1.6 as well, as installing Spark 2 on Cloudera be a bit tricky.

So we use the databricks-csv library:

spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

And this bit of code:

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._

def show_timing[T](proc: => T): T = {
    val start=System.nanoTime()
    val res = proc // call the code
    val end = System.nanoTime()
    println("Time elapsed: " + (end-start)/1000/1000 + " ms")

val schema = StructType(Array(
    StructField("id", IntegerType, true),
    StructField("VendorID", IntegerType, true),
    StructField("tpep_pickup_datetime", StringType, true),
    StructField("tpep_dropoff_datetime", StringType, true),
    StructField("passenger_count", IntegerType, true),
    StructField("trip_distance", DecimalType.Unlimited, true),
    StructField("RatecodeID", StringType, true),
    StructField("store_and_fwd_flag", StringType, true),
    StructField("PULocationID", IntegerType, true),
    StructField("DOLocationID", IntegerType, true),
    StructField("payment_type", IntegerType, true),
    StructField("fare_amount", DecimalType.Unlimited, true),
    StructField("extra", DecimalType.Unlimited, true),
    StructField("mta_tax", DecimalType.Unlimited, true),
    StructField("tip_amount", DecimalType.Unlimited, true),
    StructField("tolls_amount", DecimalType.Unlimited, true),
    StructField("improvement_surcharge", DecimalType.Unlimited, true),
    StructField("total_amount", DecimalType.Unlimited, true)))

val df ="com.databricks.spark.csv").option("header", "true").option("inferSchema", "false").schema(schema).load("hdfs:///user/cloudera/yellow_tripdata_2017-06_id.csv")


val sql = """
with tmp as(
SELECT * FROM nycspark
WHERE tip_amount >= 100
SELECT COUNT(*), max(tip_amount), total_amount as max_tip from tmp group by total_amount"""



As you can see, the performance is not exactly a miracle and we had to manually define a schema, as we removed the CSV header – but keep in mind that this approach directly reads your CSV from HDFS, basically skipping Hive altogether. While aggregates might not be the key use case here, this is a very neat feature to work with various files directly on Spark.

In order to answer our “200000 key question”, the same query in Hive is not very convincing either:

Spark Hive “get”

Another 50s.

And for good measure, the same on the CSV:

Spark CSV “get”

And even better, knowing just a bit of Scala, we can do much more with that data – we could store it in a DataFrame, do more analysis on there, write it to disk, connect it to other sources, read from a completely different file from disk without any involvement of Hive whatsoever, combine those results, transform the code into a Real-Time application and much more.

We can use Spark on Python, Scala, Java and R, can run it from a web front-end such as Apache Zeppelin or just build mighty, full-size applications that involve build servers and more unit tests than you can imagine even in your wildest dreams.

But again – while the above example surely is simple, really complex Spark applications will result in full-sized software projects. But whether it is a simple Script or a complex application, Spark will serve your end users familiar with SQL (especially on a graphical tool like Zeppelin) as well as your Data Scientists and Engineers.

Talking about Spark in a couple of paragraphs does not do it justice, but the gist of it is: Simple to use (although not as intuitive as a Microsoft SQL client) quick results, hard to master, and very powerful – but not exactly Microsoft SQL Management Studio either.


Let’s summarize: Using a fancy toolset doesn’t help anybody if it’s not used accordingly. Not surprising!

It gets even more apparent when we sum up our benchmarks:

Performance Overview

Hadoop does not magically solve your problems. Period. It does, however, provide a very powerful foundation for many years to come, especially when you think beyond your usual DWH Reporting targets.

Generally speaking –

Use Hive for “standard” queries – it gets as close to an RDBMS as you’re going to get. With ACID enabled and a smart use of Bucketing and Partitioning instead of traditional Primary Keys where applicable, using a good choice of ETL rules (avoiding VARCHAR join conditions, for instance), it serves as powerful, easy-to-use access layer to your Data Lake, especially in combination with a tool like Dbeaver. Do not expect it to make business users happy without proper Change Managstrongent.

It also opens the gate for much more advanced use cases: As starting or end point for your Machine Learning algorithms (which might very well combine structured and unstructured data), for instance.

HBase is clearly targeted at developers, but can also be exposed to end-users – if you know what you are doing. While amazing at certain things, HBase (and Phoenix) need to be used for use cases where you know your access and ingestion patterns – but if done right, it will not let you down.

Spark got (hopefully) a bit dstrongystified. Spark, ultimately, is an Execution Engine. As such, it is, again, targeted at developers. However, using smart tools like Zeppelin and the SparkSQL free-form SQL interface, it can surely be used by many Database Architects and Power Users that are willing to handle a bit more coding than usual.

Independent of the toolset, the well-known Hadoop benefits still hold true today – runs on commodity hardware, super-scalable, very robust and Open Source.

So, is the concept of a “Data Lake” and especially Hadoop right for you? I’ll tell you what I tell all my clients: It depends. But it is certainly worth a look, especially if you expect your data to grow and your reporting needs to evolve.

There is a ton of topics I did not even mention in this 4,000 word article – but rest assured that a business that knows how to make use of more than their standard “reporting” interfaces and ERP- and CRM-extracts will gain a competitive advantage in the marketplace – as we all know, knowledge is key!

All test were done on a single-node Cloudera Quickstart VM, running on QEMU/KVM with 12 AMD Ryzen 1700 vCores @ 3.2Ghz and 25GiB RAM, image launched from a 3TB WD RED Raid-1 Array under Fedora 26. yarn.nodemanager.resource.memory-mb was set to 4GiB, yarn.nodemanager.resource.cpu-vcores to 10

Continue Reading

(Tiny) Telematics with Spark and Zeppelin

A V8 doesn’t need computers

Cars these days are quite the experience. Or to use Mike Skinner’s words, they are full of “Space Invader”-technology.

Even entry-level models have more wiring, computers and sensors built in than a medium-sized plane. Well, while that might be a slight exaggeration, but a modern car collects data. A lot of it. Different sources, different numbers – this paper from 2014 quotes a dead link with “approximately 4 megabytes to over 5 gigabytes of data per vehicle per month”, other sources bring up even higher figures. However you put it – there is a lot of information available.

Location, speed, hundreds of sensors for tire pressure, MPG, wiper fluid, you get the idea. Some manufacturers might use this data, some insurance companies might and you might even see some of it in your car’s dashboard.

That’s the modern world. And then there’s my car. A 7-year-old, 130k mi battle-proven Ford Crown Victoria Police Interceptor with an all-American 4.6l V8 (“Turbos on an engine are just god’s way of telling you it ain’t got enough cylinders” –Mike Skinner, again, in character on The Grand Tour).

Not a connected car

And given its lack of fancy technology, it collects and shows exactly one metric to me: The fact that it actually has 130k miles on it. But what if we could get just a little bit more information? What if we could live up to the spirit of 2017 and not think about what we need, but what we might need? Introducing: A homegrown telematics solution!


To get a bit more serious, what exactly is telematics? In general, it means transmitting information via telecommunication networks, but when we refer to the term “telematics”, we usually talk about collecting and transmitting certain information related to a vehicle to a corporate-backend in order enable a variety of use cases. Insurance companies use telematics data to offer discounts to people who drive responsibly, car manufacturers might use it to (anonymously) collect and do all sorts of data mining and gather insights on profiles or defects and for corporate fleet, it might be used for fleet management – the potential use cases are as complex as they are plentiful.

For the scope of this article, I would like to talk about what data we can gather and analyze by simple using a car and an iPhone – feel free to extend the idea, the Macchina M2 Kickstarter might be a good starting point. This article will lay the groundwork and give some ideas on how to do it at home.

My maximum speeds in Georgia

 Scope & Technology

For this project, here’s what we like to achieve:

  • Collect all available driving data while on the road – only a car and a phone are required
  • Send and store the data on a server controlled by the user
  • Analyze the data and get all insights from it that we reasonably can
  • Show the results in a dashboard

And here’s what we’ll use:

  • SensorLog, a $2.99 app available for iOS
  • Any form of web-server that accepts HTTP POST and can store data
  • Hadoop 2.7.3 for HDFS
  • Spark 2.0 for ETL and number crunching
  • Apache Hive for data access
  • Apache Zeppelin for visualization and exploration

Yes, this is still a Big Data blog, so we will use classical Hadoop-related technologies. But please be advised that this is intended to be a proof of concept for home use and not a corporate architecture for a huge, multi-user scenario. You will see why in just a second.

As I can already see the emails coming in: “Christian, why not just use any of the dozens of apps and startups offering exactly that? You can get an ODB stick with an app for a couple of bucks! And it can do so much more!”. And you know what? Entirely true! But given the scope of this blog, this is a technology blog, first and foremost. Secondly, all data will wind up on your computers and not in any cloud or with a vendor who needs to analyze your data to make some profit.


Have a look at this very much simplified architecture diagram:

We will follow 5 major steps:

  1. Collect the data using SensorLog
  2. Transmit the data via your phone’s mobile network (in this case, via VPN)
  3. Accept & store it using node.js with express
  4. Extract, transform, analyze and persist the data using Spark 2.0 with SparkSQL on Hive
  5. Visualize the data using Apache Zeppelin with Spark, Hive and Angular.js

The architecture for this article is batch-driven via Oozie, but you could simply transform the Spark code into a SparkStreaming application to enable a real-time use case.

Further extensions or different tools are possible. You could replace node.js, use SparkStreaming with Kafka, put the data in HBase, have a mlib pipeline for Machine Learning set up – be creative.

Step 1: Collecting data

The first thing we need to define is how we collect data – and there’s an app for that. Bernd Thomas’ SensorLog for iOS collects all available metrics from your iOS device and is even able to send that data via TCP/IP sockets and HTTP, enabling our foundation for collecting data.

SensorLog collects 69 different metrics – basically everything your iPhone’s motion and GPS sensors have available. For the scope of this article, we will focus on time, location and speed.

In its standard configuration, it will gather data with a frequency of 30Hz, equal to roughly 1MB/minute in my tests. While that might not seem much, do a bit of number crunching for your driving profile: At 50minutes/day, roughly the average American commute, that would be 50MB a day just for your location profile. If we include the weekends, that’s 1.5GB a month for a single car’s movements. Multiply that by any number of users and add an imaginary overhead for more information from e.g. ODB, you quickly see why the cry for “Big Data” becomes legitimate.

Granted – a modern car will be able collect much more data than that, but we work with what we have.

Given iOS’ aggressive scheduler, the app needs to be running in order to collect data, making the process semi-automatic – you would have to start the app before and leave it open while driving. I’m sure Android has you covered if you need something that can run in the background, but for the sake of simplicity, we will live with it.

Step 2: Transmitting & storing data

In order to receive the data on our backend, you will need an HTTP server that accepts POST requests. I will not go into too much detail for this topic, as there are simply too many options available. Just make sure you limit the accepted IP range and/or a VPN and accept & store the incoming data somewhere. Here, we use CSV, as it is also SensorLog’s default format for storing the data on the phone.

// dependencies
var WebHDFS = require('webhdfs');
var express = require('express');
var fs = require('fs');

// App
var app = express();

var app = express();'/', function(req, res){
    console.log('POST /');
    res.writeHead(200, {'Content-Type': 'text/html'});

    // Write to HDFS or FS
    fs.writeFile('data.csv', req.body, function (err) {
      if (err) return console.log(err);

For our article, I used node.js with webhdfs and had this running on my Debian-powered NAS, which sits behind an openWRT router, giving me the flexibility I needed for this. Basically, anything from a massive AWS EC2 cluster to a RaspPi would work for accepting data. In the real world, you could actually use node.js – it has proven to be quite scalable.

You won’t be able to use this node.js code – I am still trying to figure out the best practices, so use it as a rough guideline.

Step 3: Analyzing the data

As so often these days, Spark to the rescue. We will use Spark 2.0, as it offers a new API for working with DataFrames and CSV files. The example code is written in Scala, but feel free to use Java, Python or R.

The first step is to initialize our SparkSQL context and build the CSV’s schema using Spark’s org.apache.spark.sql.types.StructType. We use a combination of String and Double values for this. We then load the data from a pre-defined directory on the local file system, HDFS would naturally be preferred for larger amounts of data.

val sc = SparkSession
.config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse")

// Build the schema
// I honestly don't know how to dynamically infer the schema by type - the old spark-csv library from databricks was able to do that as far as I recall...
val schemaStringMeta = "loggingTime loggingSample identifierForVendor deviceID locationTimestamp_since1970"
val schemaStringData = "locationLatitude locationLongitude locationAltitude locationSpeed locationCourse locationVerticalAccuracy locationHorizontalAccuracy locationHeadingTimestamp_since1970 locationHeadingX locationHeadingY locationHeadingZ locationTrueHeading locationMagneticHeading locationHeadingAccuracy accelerometerTimestamp_sinceReboot accelerometerAccelerationX accelerometerAccelerationY accelerometerAccelerationZ gyroTimestamp_sinceReboot gyroRotationX gyroRotationY gyroRotationZ motionTimestamp_sinceReboot motionYaw motionRoll motionPitch motionRotationRateX motionRotationRateY motionRotationRateZ motionUserAccelerationX motionUserAccelerationY motionUserAccelerationZ"
val schemaStringActivityStrings = "motionAttitudeReferenceFrame motionQuaternionX motionQuaternionY motionQuaternionZ motionQuaternionW motionGravityX motionGravityY motionGravityZ motionMagneticFieldX motionMagneticFieldY motionMagneticFieldZ motionMagneticFieldCalibrationAccuracy " +
"activityTimestamp_sinceReboot activity activityActivityConfidence activityActivityStartDate pedometerStartDate pedometerNumberofSteps pedometerDistance pedometerFloorAscended pedometerFloorDescended pedometerEndDate altimeterTimestamp_sinceReboot altimeterReset altimeterRelativeAltitude altimeterPressure IP_en0 IP_pdp_ip0 deviceOrientation batteryState batteryLevel state"

// String
val fieldsMeta = schemaStringMeta.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))

// Double
val fieldsData = schemaStringData.split(" ")
.map(fieldName => StructField(fieldName, DoubleType, nullable = true))

// String
val fieldsActivity = schemaStringActivityStrings.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fieldsMeta ++ fieldsData ++ fieldsActivity)

// Read the input files
val input ="csv").schema(schema)
.filter(col => !col.get(0).equals("loggingTime"))

Next, we try to cluster our raw data into trips. A trip is defined as a continuous flow of data. If the data collection stops, a new trip segment will be started.

To do that, we use SparkSQL’s windowing function to analyze more than one row per query. The basic idea is to Split the DataFrame when a certain metric’s difference is >= threshold (i.e. x_n – x_n+1 > t , whereas x is the metric, t is the threshold and n is its offset).

One approach for this is to use the timestamp, another would be to check if your current speed remains 0m/s for more than x seconds or a combination of the two.

    var cols = "loggingSample, identifierForVendor, deviceID, locationTimestamp_since1970, locationLatitude, locationLongitude, locationAltitude, locationSpeed, locationCourse"
    // The number of rows to lag can optionally be specified. If the number of rows to lag is not specified, the lag is one row.
    val lag = 1
    val splitBy = "locationTimestamp_since1970"

    // Add a column that includes the next row in a window, i.e. the next timestamp
    val dfDiffRaw = sc.sql("SELECT "+cols+", " +
      "(lag("+splitBy+", "+lag+") " +
      "OVER (PARTITION BY identifierForVendor ORDER BY "+splitBy+" ASC)) AS ts1 " +
      "FROM input")
    // Calculate the timestamp difference to the previous data point
    // Convert EPOCH to normal format for Hive partionining
    val dfDiff = dfDiffRaw.filter(dfDiffRaw.col("ts1").isNotNull)
      .withColumn("diff", dfDiffRaw.col("locationTimestamp_since1970")-dfDiffRaw.col("ts1"))
      .withColumn("year", org.apache.spark.sql.functions.from_unixtime(dfDiffRaw.col("locationTimestamp_since1970")).substr(0,4))
      .withColumn("month", org.apache.spark.sql.functions.from_unixtime(dfDiffRaw.col("locationTimestamp_since1970")).substr(6,2))

    // Cluster the data based on a minimum idle time between data points
    // If you did not collect data for x seconds, start a new "trip"
    val minTripLengthInS = 60
    val tripBy = "diff"
    cols = cols + ", year, month"
    val fullTrips = sc.sql("select "+cols+", diff, segment, " +
      " sum(case when "+tripBy+" > "+minTripLengthInS+" then locationTimestamp_since1970 else 0 end) " +
      " over (partition by identifierForVendor order by "+splitBy+") as trip" +
      " from (select "+cols+",diff," +
      "             lag("+tripBy+", "+lag+") " +
      "             over (partition by identifierForVendor order by "+tripBy+" asc) as segment" +
      "      from diffInput" +
      "     ) ue")

This adds a trip column, in our case just the first timestamp of the trip.

Last but not least we calculate certain KPIs that might be interesting later on. More specifically, we want to see the following KPIs per trip:

  • Top speed
  • Average speed
  • Points where we went over the speed limit

For the latter, we abstract the concept and define an absolute speed limit – for Georgia and Florida (where I collected my sample data), that would be 70mph (or 31.3m/s). In a real life scenario, you would want to think about matching the GPS location to a map provider that offers specific speed limits – because 70mph in a 30mph zone is not a good idea.

    val trips = fullTrips.groupBy("trip")

    // Calculate KPIs
    val avgSpeeds = trips.avg("locationSpeed").withColumnRenamed("avg(locationSpeed)", "avgSpeed")
    val topSpeeds = trips.max("locationSpeed").withColumnRenamed("max(locationSpeed)", "topSpeed")

    val result = fullTrips.join(avgSpeeds, "trip").join(topSpeeds, "trip")

    // Save


Top and average speed are simple – group the result by trip, use avg(1) and max(1) and join the result with the original DataFrame on column “trip”.
The result winds up in a Hive table called “telematics”.

Accessing the data

For Hive, make sure you have Hiveserver2 running and not just a local Hive with derby, as we will be using JDBC for our visualization. Verify that setup with

mysql.server start
beeline> !connect jdbc:hive2://localhost:10000


Sidenode: If you get something along the lines of “Plugin ‘*4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC’ is not loaded” – restore the hive user in mysql. Setting up Hiveserver2 on Mac is not fun.

Visualizing the data

First off, I tried using R with shiny, but failed. See my StackOverflow post here. An alternative is Tableau, as it can use JDBC as of version 10, but that might a bit pricey for a small PoC on your local machine.

Example trips in Georgia

For the sake of simplicity, we will use Apache Zeppelin. It is standard in Hortonworks HDP, but also runs fine as standalone. To configure Zeppelin, refer to your environment. For me, I had to manually configure JDBC to use the hive driver and reference said jar in ZEPPELIN_CLASSPATH via the file and some other details.

We use this angular.js example to visualize the data on a map.

In order to do that, we pull the data back from Hive using SparkSQL on scala directly in Zeppelin and map the result to a “trip” object which we bind to the global scope using z.angularBind(2).

val data = sqlContext.sql("select * from telematics")
val fullTrips = data.collect

case class trip(id: String, ts: String, speed: Double, lat: Double, long: Double)
val items = fullTrips.zipWithIndex.filter(_._2%10==0).map(_._1)
val result = => trip(

z.angularBind("locations", result)

Since the map is rendered on the client, we filter every nth result before actually showing the result:

Example trip in Atlanta, GA with Spark

As we can see in this example, I went around 70mph on 01/08/17.

We can also use Zeppelin to analyze our data with any other form of SQL and visualization. For instance, this example, we took all trips currently in the system and plotted them against their top- and average speeds.

Average and top speeds by trip

There’s more to be done here, of course – a fun exercise would be to link my profile (which is connected to Spotify) with my top speeds. Now is Watain making me floor it? Or does a CCR playlist have the same effect?


This is a very simple showcase. It uses a variety of technologies to bring a bit of connectivity to my old Crown Vic, but is it worth it?

Absolutely not. Sure – it is a cool proof of concept for me to have a look on how I drive where, to explore the data we can generate using and iPhone and to find out that there are a couple of 1 mile trips to Popeye’s (which maybe, just maybe, one could avoid). But after all, it is just one car with location and speed data.

But: As soon as we extend this concept by collecting actual ODB data (see the kickstarter link above!), link more data to it or add more users, we have a very interesting concept on our hands. The more actual insights we are able to collect, the more we can learn. How to my car’s sensor metrics relate to the driving style? What is my driving profile, how does it relate to that of my friends and their cars? Is there open maintenance, should I schedule it, has it anything to do with how I drive? If we can we enrich the GPS data with speedlimits, how much would the police like my driving style?

The foundation is there. We can collect, send, format, store and analyze data. Adding just more data to it is trivial (given we are able to extract it).

But for now – a fun little project and maybe a foundation for more.

The full source is available on GitHub.

Continue Reading

Storm vs. Heron – Part 2 – Why Heron? A developer’s view

This article is part 2 of an upcoming article series, Storm vs. Heron. Follow me on Twitter to make sure you don’t miss the next part!

In the last part of the series, we looked at how to transform your existing Storm topologies to Twitter’s new distributed streaming- and analytics-framework, Heron. In this part of the series, we will actually see why you would want to do this. This part will see Storm from a developer’s view, whereas the next part will focus on operations & maintenance.

Why Storm leaves stuff to be desired

First off, let me start with stating that Apache Storm surely is great and flexible framework, but often doesn’t quite satisfy all the requirements an enterprise-ready Big Data component should fulfil. Its weaknesses show in both architecture and the way an enterprise can handle Storm development, testing, deployment and maintenance.

The things one could objectively criticize about Storm boil down to the following points:

  • Unified resource assumption & performance (Developers)
  • Insufficient resilience features (Developers)
  • Insufficient debugging abilities (Operations)
  • Insufficient monitoring abilities (Operations)

Naturally, these all depend on your businesses’ specific requirements for the use cases you want to build – but generally speaking, these points should affect the majority of users.

Some points might be simplified for the sake of accessibility to the topic.

Unified resource assumption & performance

First, let’s have a look at how Storm structures its minions:

Storm worker architecture

The top-level hierarchy consists of one (< Storm 1.x) or multiple, high-available (>= Storm 1.x) so-called Nimbus servers which coordinate your topologies. These Nimbus servers communicate with your second level, the supervisor nodes executing your code. An optional UI node as well as an optional log viewer process can be deployed as well.

Every supervisor runs multiple JVMs (in so-called workers), which then again execute your actual code fragments in their respective executors. And here’s the actual big first issue with this design – you have absolutely no ability to properly control your resource usage. In your respective topologies, you can freely set different numbers of executor processes for your bolts and spouts, but have no ability to control where your process will end up1. It is possible that a long-running, complex part of your topology shares a JVM with a simple calculation component. It becomes almost impossible to isolate- and profile these tasks separately (I actually once wound up writing a rather complex parser written in R, iterating over JVM-dumps, logs etc. for that task).

Also, if one of the more complex bolts crashes (e.g. an uncaught exception), it takes down completely unrelated tasks with it!

Storm JVM architecture

Furthermore, all JVMs have a fixed memory setting – depending on the most RAM-intensive component currently running. The resources this JVM holds are not linked with any other Hadoop scheduling component, such as YARN or Mesos, making it almost impossible to use shared nodes (e.g. Storm supervisors + HDFS datanodes) if you care about resource allocation2.

In reality, this basically forces you to think about deploying entirely separate Storm clusters per topology. This may sound drastic, but makes sense if you think about it.

Imagine a topology with two main parts – a spout taking up 10GB of memory and a bolt that needs 5GB to do its job. If you now run 1 spout and 3 bolts, you will effectively need to allocate 30GB, as one of the JVMs will need at least 15GB (one spout and one bolt). This effectively wastes 5GB of memory (1*10+3*5=25). And yes, this is blatantly taken from this paper (which I highly recommend reading).

Now, there’s more “under the hood” stuff that Storm isn’t exactly smart about, for instance…

  • A lack of backpressure
  • Overly harsh load on Zookeeper by filling it with heartbeats and states
  • A lot of useless threads for sending and receiving tuples
  • A lot of GC action
  • Merged log files per worker


Now, Heron is a lot smarter about basically all of this.

Heron Architecture

Heron uses Aurora, a framework atop of Mesos, to schedule its topologies. This solves your dilemma of Storm allocating its resources independently. YARN is also an option for existing setups.

Heron runs user code and its managers (used for coordinating the topology’s components) in a so called container. If you’ve ever worked with YARN / M/R2, you know the general idea.

Within these containers, Heron Instances run the user code in separate JVMs. With this approach, dedicated resource allocation, profiling and debugging is possible. It also enables us to have dedicated logs and access these JVMs in the UI.

A Stream Manager now efficiently manages the tuple communication and backpressure and a Topology Master communicates metadata with Zookeeper.

Metadata is kept in Zookeeper; not all heartbeats and transactions like in Storm.


Naturally, this has an impact on performance as well. Heron claims to be “up to x14 faster” than Storm. And who am I do doubt them?

Well, personally, I don’t fall for marketing that aims to throw big numbers at me – performance in real-life scenarios is limited by hundreds of factors, for instance I/O implementations and limits, network throughput, API limits and so on and so forth.

But still, running my example topology from Part 1 in single node mode (i.e. locally), without Mesos, with a limited Twitter API and a horrible HDF implementation on a single AWS r3.xlarge (Intel Xeon E5-2670 4 vCores, 30GiB Ram) instance gave me 60% more throughput – which makes me believe Twitter’s “x14” claim (for WordCount) much more.


So, if you activate Aurora, run it in an actual distributed mode and optimize the known bottlenecks (HDFS, Twitter API), I can see this thing performing much better.

And even if it doesn’t – without Heron, I would barely be able to even think about profiling my bolts, because everything is mushed into an unreadable, nasty array of JVMs.

Heron: 1. Storm: 0.

Insufficient resilience features

Resilience is a term many enterprises love to use and can mean many things. For some businesses, it translates to “stateful processing”, in others to “stateful & exactly-once processing”. Some just refer to the architecture of the framework and cluster setup (what happens if a node breaks down or is not available?). In the end, it means that our topologies need to produce results – data – that the business can use and usually that it manages errors. And you, as an engineer, need to think about how to do that with the framework at hand.

System failures

Storm manages some of these requirements fairly well – as long as you give it enough supervisor machines to play with. If one of these machines stops sending heartbeats via Zookeeper, it stops being available for your topologies. Storm (read: the nimbus) re-allocates the resources to the existing machines. As soon as the machine becomes available again, Storm will make it part of its supervisor network again. And if you use a Storm version >= 1.0.x, you can also remove the SPoF of the nimbus.

Stateful processing with Trident or Storm

Stateful processing, however, is a whole different story. Usually, a tuple is stateless and its lifetime is managed by the acknowledgement mechanism you chose to implement. Let me be blunt and quote from the official documentation:

“There’s two things you have to do as a user to benefit from Storm’s reliability capabilities. First, you need to tell Storm whenever you’re creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm’s API provides a concise way of doing both of these tasks. Specifying a link in the tuple tree is called anchoring”

In other words, you rely on your queue (Kafka, JMS, Kestrel, Twitter…) for managing your resilience. If something fails – and stuff will fail eventually – you need to re-process the entire tuple-tree. Technically speaking, this process is still stateless, but more secure than just hoping your topology won’t mess something up.

To actually achieve stateful processing, i.e. storing information about the progress and status of a tuple, you have two options left: Storm’s new Stateful API or Trident. Trident offers you many great features, but is another layer atop of Storm you will need to manage and understand. See this article from the documentation for further reference.

Storm’s stateful API is pretty new and relies on a KeyValueStore and a persistence layer of your choice (currently, that would boil down to redis). If your worker crashes, Storm can than go ahead, read from said store and re-process the tuples.

All of these mechanisms require you to put quite a lot of thought into your design. If you compare this with the way SparkStreaming3 handles this – by supplying a checkpoint directory on HDFS – you quickly notice why one might be sceptical about Storm’s way of doing things.



Now, here’s the deal with Heron – while it aims to improve many things “under the hood” compared to Storm (architecture, performance, monitoring, debugging …), it still is fully compatible to Apache Storm (see Part 1 of this series). While arguably this is a huge benefit for corporations using Storm right now, it does not actually impact the development model too much – in other words, resilience will be still a topic you have to think about yourself. Now, if I’m the one who missed a killer feature that Heron introduced for this, let me know!


So, what can we take away from this as developers? First of all, we need to acknowledge (no pun intended) that Heron is built for using Storm code – with all its up- and downsides. While the basic programming model of Storm is fairly simple, it quickly gets complicated when we need to throw catchphrases like “states”, “WAL” or “exactly once” into the discussion.

However: Just executing the existing code renders many benefits. We get better performance even in extremely simplified setups, we utilize our machine’s resources much better than Storm ever could (saving $$ in the process!) and potentially save ourselves several headaches when it comes to crashing JVMs and other fun-failures you encounter on any Big Data Cluster.

In the next part, the benefits will get even more obvious – when we take a detailed look how you can actually operate, monitor and eventually debug a Storm vs. Heron cluster in production environments.



  1. Not entirely true per se – you can look into things like the isolation scheduler, but that is basically working around a core issue in the software’s design.
  2. Storm on Yarn is possible, for instance via Hortonwork’s Big Data distribution, albeit not out of the box.
  3. I don’t see SparkStreaming as a “holy grail” of frameworks. Yes, it receives more updates, yes, it seems more mature in many ways, but it is far from being an alternative for Storm/Heron/Flink/Samza for any case. But this would require me to rant in another article.
Continue Reading

Storm vs. Heron, Part 1: Reusing a Storm topology for Heron

This article is part 1 of an upcoming article series, Storm vs. Heron. Follow me on Twitter to make sure you don’t miss the next part!

When upgrading your existing Apache Storm topologies to be compatible with Twitter’s newest distributed stream processing engine, Heron, you can just follow the instructions over at Heron’s page, as Heron aims to be fully compatible with existing Storm topologies. But is it really that simple? I’ve tried that, using my very much real topology and was not overly surprised by the result.


So, here’s my setup:

  • Hadoop 2.7.3 (HDFS + YARN)
  • Apache Storm 1.0.2
  • Twitter Heron 0.14.3
  • MacOS Sierra 10.2

All my dependencies come from using

$ brew install


First off, the topology I’m using is my Twitter Analysis topology that reads in tweets (filtered by keywords, using twitter4j), runs a couple of analysis queries on these (based on a prototypical word list from pre-defined reference accounts, basically sh*tty, supervised machine learning for non Data Scientists) and persists them into HDFS, using the storm-hdfs library.

I’ll come to the details in a later article; for know, all we need to know is that we use the most recent, stable versions of Storm and an external library that does what Storm does best, stream analysis and simple storage.

The process

What Twitter recommends is simple: Install Heron, remove the following dependency


from your Maven pom.xml and replace it with Heron’s dependencies. Sounds too amazing to be true, considering that updating from Storm 0.x to 1.x required us to do actual code changes! Not to mention the way easier deployment, configuration and under-the-hood changes Heron comes with. But unfortunately, not everything went as smoothly as planned…



As of right now, Heron seems not to be compatible with storm-hdfs (albeit stating otherwise on the project’s GitHub in the past), judging by this error:

Screen Shot 2016-10-15 at 17.00.07
java.lang.NoClassDefFoundError: org/apache/storm/utils/TupleUtils

You have a couple of options for this, I guess. Heron seems to be not able to work with pre-compiled classes containing Storm-objects, so your best shot would be to grab the library and integrate it in your project as regular classes and packages.

The other option is re-writing the library. While that seems like an expensive endeavour, it may prove useful if you require more control or flexibility anyways. As I did this in the past with the storm-hbase library for those exact reasons, I’m pretty sure that this is far from a viable option for everyone, but surely it will work for some.

Considering the sheer amount of external libraries for Storm (kafka, hdfs, hbase, mqtt, hive, jms, redis …), this could turn out to be a real problem, though. So, if you know of a smarter alternative, let me know!

Update: Thanks to the Heron team over at Twitter, the external systems for Heron are work in progress!


Also, twitter4j did me not leave exactly happy. Whilst my simple spout worked, it was not able to emit the twitter4j Status interface, representing a tweet, as it was usually coming in in the form of a twitter4j.StatusJSONImpl.class, not visible from outside the twitter4j package. As Heron uses kryo for this, I was not able to register it with the Heron configuration:


The solution to this was fairly simple, after the Reflections library failed me as well: I used a custom wrapper object, emulating the Status interface and hence remaining compatible with the rest of my code. Not pretty, but works – and required me to touch code after all. Remember to register these classes as well!

The asm library

That one was tricky – after fixing the previous errors, I was left with this:

asm heron
IncompatibleClassChangeError: Found interface org.objectweb.asm.MethodVisitor, but class was expected

As it turns out, the method call you see there used to be a interface call in asm 3.x, but was switched to a class in 4.x+. Adding this to my pom.xml fixed that one as well:



Heron surely is awesome when we compare it to Storm – at least in theory. While the idea of literally re-using your entire source with a new, better technology seems thrilling, reality looked a bit less exiting. Anyways, after a couple of hours debugging and using fixes that maybe could be done a bit less brutally, it is now running beautifully on my local machine. Time to test it on a cluster!

Heron UI
Heron UI


Heron is in development and things change quickly. These issues might not be true for your setup and there might be a perfectly viable solution to all of this – but I wasn’t able to find it and hence decided to document my experiences here.

Continue Reading

Update an HBase table with Hive… or sed

This article explains how to edit a structured number of records in HBase by combining Hive on M/R2 and sed – and how to do it properly with Hive.

HBase is an impressive piece of software – massively scalable, super-fast and built upon Google’s BigTable… and if anyone knows “How To Big Data”, it ought to be these guys.

But whether you are new to HBase or just never felt the need, working with it’s internals tends not to be the most trivial task. The HBase shell is only slightly better in terms of usability than the infamous zkcli (Zookeeper Command Line Interface) and once it comes to editing records, you’ll wind up with tedious, long and complex statements before you revert to using it’s (Java) API to fulfill your needs – if the mighty Kerberos doesn’t stop you before.

But what if… there were a way that is both stupid and functional? What if it could be done without a programmer? Ladies and gentlemen, meet sed.

sed (stream editor) is a Unix utility that parses and transforms text, using a simple, compact programming language.

The scenario

Let’s say you have a table with customer records on HBase, logging all user’s interactions on your platform (your online store, your bookkeeping, basically anything that can collect metrics):

key cf:user cf:action cf:ts cf:client
142_1472676242 142 34 1472676242 1
999_ 1472683262 999 1 1472683262 2

Where: cf:user is the user’s id, cf:action maps to a page on your system (say 1 quals login, 34 equals a specific screen), cf:ts is the interaction’s timestamp and cf:client is the client, say 1 for OS X, 2 for Windows, 3 for Linux.

Let’s ignore the redundancy in timestamp and user or the usability of the key – let’s stick to simple design for this.

One of your engineers (surely not you!) f@#!ed up and your mapping for cf:client – between 2016-08-22 and 2016-08-24, every record indicating “client” is one off! 1 is now 0 (making me using an NaN OS) and 2 is now 1 and maps to OS X, 2 to Windows – and suddenly, Linux is of the charts. But don’t panic – Hive to the rescue!

Set the stage

Here’s what we’ll use:

  • Hadoop 2.6.0 (HDFS + YARN)
  • HBase 1.2.2
  • Hive 2.1.0
  • A custom create script for the data
  • OS X 10.11.6

And here’s what we’re going to do: First, we’ll create our HBase table.

hbase shell
create 'action', 'cf', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}


Bam, good to go! The table’s empty – but we’ll get to that in a second.

In order to do anything with our view, we need data. I’ve written a small Spark app that you can get here and run as such:

git clone ...
mvn clean install
cd target
spark-submit \
--class com.otterinasuit.spark.datagenerator.Main \
--master localhost:9000 \
--deploy-mode client \
CreateTestHBaseRecords-1.0-SNAPSHOT.jar \
2000000 \ # number of records
4 # threads 

Let’s run it with a charming 2,000,000 records and see where that goes. On my mid-2014 MacBook Pro with i5 and 8GB of Ram, the job ran for 9.8 minutes. Might want to speed that up with a cluster of choice!

If you want to check the results, you can use good ol’ mapreduce:

hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'action'


hbase shell
count 'action', INTERVAL=>100000


Granted, you would not have needed Spark for this, but Spark is awesome! Also, it’s overhead in relation to it’s performance benefits beats shell- or python scripts doing the same job in many daily use cases (surely not running on a tiny laptop, but that’s not the point). I was forced to use some semi-smart approaches (Spark mllib for Random data and changing the type with a map()-function might be one) – but hey, it works!

Next, an external Hive view – the baseline for all our Hive interactions with HBase.

key string
, userf int
, action int
, ts bigint
, client int
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
"hbase.columns.mapping" = "
, cf:user#b
, cf:action#b
, cf:ts#b
, cf:client#b
TBLPROPERTIES("" = "action");

Unless you are trying to abuse it (like we’re about to do!), Hive is a really awesome tool. Its HQL, SQL-like query language comes really natural to most developers and it can be used in a big variety of sources and for an even bigger variety of use cases. Hive is, however, a tool that operates on top of MapReduce2 by default and with that, it comes with a notable performance overhead. So do yourself a favor and have a look at Impala, Tez and Drill or running Hive on Spark. But let’s abuse Hive with M/R2 a bit more first.

Hive magic

We now have an HBase table, a Hive external table and data. Let’s translate our problem from before to HQL. In the statement, 1471824000 equals 2016-08-22 (midnight), 1472083199 is 2016-08-24 (23:59:59).

UPDATE test.action SET client = client+1 WHERE ts >= 1471824000 AND ts <= 1472083199;

But what’s that?

FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

The reason why this doesn’t work is simple:

If a table is to be used in ACID writes (insert, update, delete) then the table property “transactional” must be set on that table, starting with Hive 0.14.0. Without this value, inserts will be done in the old style; updates and deletes will be prohibited.

Apart from setting a table property, we will also need to edit the Hive configuration and change the default values for to true and hive.exec.dynamic.partition.mode to nonstrict. That’s not only something that would usually involve several people (not good for our unlucky engineer), it also requires you to have the table in ORC format – not for us.

But one thing Hive can also do is to create managed tables. A managed table (instead of an external table like we created above), the table is more comparable to a traditional RDBMS table, as it will be actually managed by Hive as opposed to be just a layer atop of an HBase table. By defining STORED AS TEXTFILE, Hive manages the data with a set of textfiles on HDFS.

CREATE TABLE IF NOT EXISTS test.action_hdfs(
`key` string,
`userf` int,
`action` int,
`ts` bigint,
`client` int
LOCATION '/data';

SELECT COUNT(*) FROM test.action_hdfs;

The result will be 0:


So that needs content – let’s re-use the statement from above:

INSERT OVERWRITE DIRECTORY '/data' SELECT key,userf,ts,action,client FROM test.action WHERE ts >= 1471824000 AND ts <= 1472083199;

This runs a query on our initial, external table and stores the result into HDFS. It tells you not to use M/R2 anymore.


And since it now boils down to a simple text file on HDFS, we can simply read and update its contents using table #2! So, let’s see what’s in the file:

hdfs dfs -ls /data
hdfs dfs -cat /data/000000_0 | head

Screen Shot 2016-09-02 at 23.14.03
That doesn’t look terribly convincing, but the reason is simple – Hive uses the ^A (\x01) character as delimiter by default. We can even prove that:

hdfs dfs -copyToLocal /data/000000_0
hexdump -C 000000_0 | head

Screen Shot 2016-09-02 at 23.14.55
For those of you who are still afraid of HexDumps: After the length of each field, the next character is an an 0x01. Or, in other words: Every byte in a line after the last 0x01 byte will be our client variable.
Or, just use cat -v:

Screen Shot 2016-09-02 at 23.17.01

Seems like a random sequence of numbers, but is actually a cheaply stored separated file. An ^Asv, if you will.

Let’s finally use all that knowledge and do what we came here to do: Increment our client setting!
This is the command we’re going to use for our data manipulation dry run:

hadoop fs -cat /data/000000_0 | while read line; do c=${line: -1} && c=$((++c)) | echo $line | sed -e 's/[0-9]$/'$c'/' ; done | head
hdfs dfs -cat /data/000000_0 | head

Let’s breake that down:
hadoop fs -cat /data/000000_0 will pipe our HDFS file to stdout
while read line; do will start a loop for every line
c=${line: -1} will take the line’s last character (our client!) as variable $c
c=$((++c)) will increment said number
echo $line sends it to stdout where sed is waiting
sed -e ‘s/[0-9]$/’$c’/’ replaces every number ([0-9]$) at the end of a string ([0-9]$) with the newly-incremented variable $c.
Screen Shot 2016-09-02 at 23.18.11
And there you have it! Note the last number increased by one.

By the way: sed does not support Negative Lookaheads in regex. Learned that one the hard way.

If we now want to move that data back to hadoop, all we need to do is pipe to HDFS instead of stdout:

hadoop fs -cat /data/000000_0 | while read line; do c=${line: -1} && c=$((++c)) | echo $line | sed -e 's/[0-9]$/'$c'/' ; done | hadoop fs -put - /data/000000_2
hdfs dfs -cat /data/000000_2 | head

In order to get that data back in our original HBase table, load up hive again:

LOAD DATA INPATH '/data/000000_2' INTO TABLE test.action_hdfs;
SELECT client, COUNT(*) FROM test.action GROUP BY client;

0 16387
1 666258
2 665843
3 651512

16,387 invalid records that are immediately spottable as well as those hiding in client 1 and 2 – damn!

INSERT OVERWRITE TABLE test.action SELECT key,userf,action,ts,client FROM test.action_hdfs;
SELECT client, COUNT(*) FROM test.action GROUP BY client;

1 666293
2 665948
3 667759

As 0 used to be an undefined state and the result is now 0 and used to be 16,387, we seem to have fixed our mistake. You can further validate this with a full COUNT(*) or by calculating the differences between the client-values.

Why you should not do this

Let’s breake down what we had to do:
– Create two Hive tables
– Export our data to HDFS (where, technically speaking, it already resides)
– Learn how to interpret an Hive HDFS datafile
– Write and execute some crazy sed-regex-monstrosity
– Do all of that manipulation on a single machine, using stdout and hence memory
– Do all that without ACID or any form of transactional security

Also, we simply ignored all the ideas, frameworks and algorithms behind efficient Big Data – its like managing all your financials in Macro-loaded Excel files. It works, but it is inefficient, complicated, unsecure and slow.

The right way

Let’s see how to do it properly.

CREATE TABLE test.action_buffer 
client + 1 AS client
FROM test.action
WHERE ts >= 1471824000 AND ts <= 1472083199;
SELECT client, COUNT(*) FROM test.action_buffer GROUP BY client;

That gives us:
1 16387
2 16352
3 16247

INSERT OVERWRITE TABLE test.action SELECT key,userf,action,ts,client FROM test.action_buffer;
SELECT client, COUNT(*) FROM test.action_buffer GROUP BY client;

1 666293
2 665948
3 667759
The idea here is quickly explained: Instead of updating a record, i.e. trying to modify HBase columns directly, our current Hive query is comparable with a simple put() operation on HBase. While we will overwrite the entire record, there is no downside to it – the keys and other information stays identical.
There you have it – the same result (no client = 0), much faster, much easier and executed on your distributed computing engine of choice!

Continue Reading