Analyzing Twitter Location Data with Heron, Machine Learning, Google’s NLP, and BigQuery

Introduction

In this article, we will use Heron, the distributed stream processing and analytics engine from Twitter, together with Google’s NLP toolkit, Nominatim and some Machine Learning as well as Google’s BigTable, BigQuery, and Data Studio to plot Twitter user’s assumed location across the US.

We will show how much your Twitter profile actually tells someone about you, how it is possible to map your opinions and sentiments to parts of the country without having the location enabled on the Twitter app, and how Google’s Cloud can help us achieve this.

About language, locations, social media, and privacy

While it is safe to assume that most Twitter users do not enable the Location Services while using the Social network, we can also assume that a lot of people still willingly disclose their location – or at least something resembling a location – on their public Twitter profile.

Furthermore, Twitter (for the most part) is a public network – and a user’s opinion (subtle or bold) can be used for various Data Mining techniques, most of which do disclose more than meets the eye.

Putting this together with the vast advances in publicly available, easy-to-use cloud-driven solutions for Natural Language Processing (NLP) and Machine Learning (ML) from the likes of Google, Amazon or Microsoft, any company or engineer with the wish to tap this data has more powerful tool sets and their disposal than ever before.

Scope of the Project

For this article, we will write a Heron Topology that does the following –

  • Read from Twitter, given certain seed keywords, filtering out users that do not disclose any location information, either as metadata or profile data
  • Use the Google NLP service to analyze the tweets and a user’s location data
  • Use Nominatim (based on OpenStreetMap data) to apply reverse-geocoding on the results
  • Use the DBSCAN cluster with a Haversine distance metric to cluster our results
  • Write the results back to Google BigTable or BigQuery

Then, we’ll visualize the results with Cloud Studio.

Architecture

The architecture for this process is fairly simple:

Architecture (simplified)

Heron serves as our Stream processing engine and local ML, Nominatim on Postgres serves as Geo-Decoder.

On Google Cloud, we use the NLP API to enrich data, BigTable and BigQuery for storage and Data Studio for visualization.

BigTable (think HBase) is used for simple, inexpensive mass-inserts, while BigQuery is used for analytics. For the sake of simplicity, I’ll refer to one of my old articles which explains quite a bit about when to use BigTable/Hbase and when not to.

Hybrid Cloud

While the notion of “Hybrid Cloud” warrants its own article, allow me to give you an introduction what this means in this context.

For this article, I heavily utilized the Google Cloud stack. The Google NLP API provides me simple access to NLP libraries, without extensive research or complex libraries and training sets.

Google BigTable and BigQuery provide two serverless, powerful data storage solutions that can be easily implemented in your programming language of choice – BigTable simply uses the Apache HBase Interface.

Google Data Studio can access those Cloud-based sources and visualize them similar to what e.g. Tableau can achieve, without the need to worry about the complexity and up-front cost that come with such tools (which doesn’t imply Data Studo can do all the things Tableau can).

At the same time, my Nominatim instance as well as my Heron Cluster still run on my local development machine. In this case, the reason is simply cost – setting up multiple Compute Engine and/or Kubernetes instances simply quickly exceeds any reasonable expense for a little bit of free-time research.

When we translate this into “business” terminology – we have a “legacy” system which is heavily integrated in the business infrastructure and the capital expense to move to a different technology does not warrant the overall lower TCO. Or something along those lines…

The following section describes the implementation of the solution.

Reading from Twitter

First off, we are getting data from Twitter. We use the twitter4j library for this. A Heron Spout consumes the data and pushes it down the line. We use a set of keywords defined in the conf.properties to consume an arbitrary topic.

Here, we ensure that a tweet contains location information, either from Twitter or via the user’s own profile.

if (location != null) {
    Document locDoc = Document.newBuilder()
            .setContent(location).setType(Type.PLAIN_TEXT).build();

    List<Entity> locEntitiesList = language.analyzeEntities(locDoc, EncodingType.UTF16).getEntitiesList();
    String locQuery = getLocQueryFromNlp(locEntitiesList);


    // Build query
    NominatimSearchRequest nsr = nomatimHelper.getNominatimSearchRequest(locQuery);

    // Add the results to a query, if accurate
    JsonNominatimClient client = nomatimHelper.getClient();
    ArrayList<String> reverseLookupIds = new ArrayList<>();
    List<Address> addresses = new ArrayList<>();
    if (!locQuery.isEmpty()) {
        addresses = client.search(nsr);
        for (Address ad : addresses) {
            logger.debug("Place: {}, lat: {}, long: {}", ad.getDisplayName(),
                    String.valueOf(ad.getLatitude()),
                    String.valueOf(ad.getLongitude()));

            Location loc = LocationUtil.addressToLocation(ad);

            // Filter out points that are not actual data points
            String osmType = ad.getOsmType();

            if (osmType != null && (osmType.equals("node") || osmType.equals("relation") ||
                    osmType.equals("administrative")) && loc.isWithinUSA()) {
                locList.add(loc);
                reverseLookupIds.add(loc.getReverseLookupId());
            }
        }
    }

You could re-route users that do not use location data to an alternate pipeline and still analyze their tweets for sentiments and entries.

Understanding Twitter locations

The next bolt applies the Google Natural Language Toolkit on two parts of the tweet: It’s content and the location.

For an example, let’s use my own tweet about the pointless “smart” watch I bought last year. If we analyze the tweet’s text, we get the following result:

(Granted, this isn’t the best example – I simply don’t tweet that much)

For this article, we won’t focus too much on the actual content. While it is a fascinating topic in itself, we will focus on the location of the user for now.

When it comes to that, things become a little more intriguing. When people submit tweets, they have the option to add a GPS location to their tweet – but unless you enable it, this information simply returns null. Null Island might be a thing, but not a useful one.

However, we can also assume that many users use their profile page to tell others something about themselves – including their location. As this data gets exposed by Twitter’s API, we get something like this:

While a human as well as any computer can easily understand my profile – it means Atlanta, Georgia, United States Of America, insignificant little blue-green planet (whose ape-descended lifeforms are so amazingly primitive that they still think digital smart watches are a great idea), Solar System, Milky Way, Local Group, Virgo Supercluster, The Universe – it’s not so easy for the more obscure addresses people put in their profile.

A random selection –

  • What used to be Earth
  • Oregon, (upper left)
  • in a free country
  • MIL-WI
  • Oban, Scotland UK 🎬🎥🎥🎬
  • United States
  • your moms house
  • Between Kentucky, Ohio
  • Savannah Ga.
  • Providece Texas A@M Universty
  • USA . Married over 50 yrs to HS Sweetheart
  • 24hrs on d street hustling

(All typos and emojis – the tragedy of the Unicode standard – were copied “as is”)

In case you are wondering, “your moms house” is in Beech Grove, IN.

The sample set I took most of this data from had only 25 entries and were parsed by hand. Out of those, 16 used ANSI standard INCITS 38:2009 for the state names, but in various formats.

A whole 24% used what I called “other” – like “your moms house”.

On a bigger scale, out of a sample set of 22,800 imported tweets, 15,630 (68%) had some type of location in their profile, but only 11 had their actual GPS location enabled.

Points scored

For the time, we can conclude that most Twitter users tell us something about their location – intentional or not. For a more scientific approach, here’s a link – keep in mind that my data is a random selection.

However – using user-entered data always results in messy, fuzzy, non-structured data. This has been a problem way before the terms “Machine Learning” or “Analytics” exceeded any marketing company’s wildest dreams. Levenshtein-Distance record matching, anyone?

Using NLP to identity entities & locations

At this point, Google’s NLP toolkit comes into play again. We use the NLPT to get all locations from the user’s self-entered “place” to identify everything that has the LOCATION metadata flag.

This is simple for something like this:

“Oregon” is clearly the location we need. We were able to strip of “upper left” – and could even weigh this based on the specific salience value.

However, more obscure queries result in more random results:

But even here, we get the core of the statement – USA – with a high confidence level.

The more random place descriptions (such as “a free country”) naturally only produce low-quality results – which is why we should filter results with a low salience score. While this only means “relative to this set of text, this entity is relatively important/unimportant”, it does serve as a rough filter.

In order to use more standardized data, we can also use the wikipedia_url property of the NLP toolkit (if available) and extract a more useful string. This results in “Baltimore, MD” to be turned into “Baltimore Maryland”, for instance.

However, “Atlanta” turns into “Atlanta (TV Series)” – so use it with caution.

public static List<Cluster<Location>> dbscanWithHaversine(ArrayList<Location> input) {
    DBSCANClusterer<Location> clusterer = new DBSCANClusterer<>(EPS, MIN_POINTS, new HaversineDistance());
    return clusterer.cluster(input);
}
public class HaversineDistance implements DistanceMeasure {
    @Override
    public double compute(double[] doubles, double[] doubles1) throws DimensionMismatchException {
        if (doubles.length != 2 || doubles1.length != 2)
            throw new DimensionMismatchException(doubles.length, doubles1.length);

        Location l1 = new Location("A", doubles[0], doubles[1],0,"N/A");
        Location l2 = new Location("B", doubles1[0], doubles1[1],0,"N/A");
        return MathHelper.getHaversineDistance(l1, l2);
    }
}
public static double getHaversineDistance(Location loc1, Location loc2) {
    Double latDistance = toRad(loc2.getLatitude() - loc1.getLatitude());
    Double lonDistance = toRad(loc2.getLongitude() - loc1.getLongitude());
    Double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) +
            Math.cos(toRad(loc1.getLatitude())) * Math.cos(toRad(loc1.getLatitude())) *
                    Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
    Double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
    return R * c;
}

Reverse Geocoding & Clustering indecisive answers

The next step, assuming we received unstructured location data, we will try to convert that data into a query for a location service that supports reverse geocoding. Location services and their respective APIs are plentiful online – as long as it is able to convert a query of location data into a set of potential matches, we will be able to use that data.

In this case, we are using the Nominatim client from OpenStreetMap. Given the data volume, it is advisable to host Nominatim on Kubernetes or your local development machine – the openstreetmap servers will block your IP if you accidentally DDos them or simply don’t respect their Fair Use Policy – and the velocity of streaming tends to violate basically every Fair Use clause in existence.

OpenStreetMap will return a list of potential matches. Take this example when our location is “Springfield” and we limit the results to 20:

Springfield data points

As you can see, this is not conclusive. So we need to find a way to figure out which result is most accurate.

Fun Fact: Without a country boundary on the US with Nominatin, this is what “Detroit Michigan” produces:

Using clustering to approximate locations

In order to figure out where on the map our result is, we use Density-based spatial clustering of applications with noise (DBSCAN), a clustering algorithm that maps points by their density and also takes care of any outliers.

DBSCAN illustration

I found this article’s description of the algorithm most conclusive. Short version – for a dataset of n-dimensional data points, a n-dimensional sphere with the radius ɛ is defined as well as the data-points within that sphere. If the points in the sphere are > a defined number of min_points, a cluster is defined. For all points except the cener, the same logic is applied recursively.

As DBSCAN requires the ɛ parameter to be set to the maximum distance between two points for them to be considered as in the same neighborhood. In order to set this parameter to a meaningful value, we use the Haversine distance to get the orthodromic distance on a sphere – in our case, a rough approximation of the earth and therefore a result in kilometeres between locations.

The Haversine function is defined as such –

where

  • d is the distance between the two points (along a great circle of the sphere; see spherical distance),
  • r is the radius of the sphere,
  • φ1, φ2: latitude of point 1 and latitude of point 2, in radians
  • λ1, λ2: longitude of point 1 and longitude of point 2, in radians

In our case , r is defined as the radius of the earth, 6,371 km.

To combine those, we can use the org.apache.commons.math3.ml package. All we need to do is implement the DistanceMeasure interface (as well as the function for the Haversine distance itself).

public static List<Cluster<Location>> dbscanWithHaversine(ArrayList<Location> input) {
    DBSCANClusterer<Location> clusterer = new DBSCANClusterer<>(EPS, MIN_POINTS, new HaversineDistance());
    return clusterer.cluster(input);
}
public class HaversineDistance implements DistanceMeasure {
    @Override
    public double compute(double[] doubles, double[] doubles1) throws DimensionMismatchException {
        if (doubles.length != 2 || doubles1.length != 2)
            throw new DimensionMismatchException(doubles.length, doubles1.length);

        Location l1 = new Location("A", doubles[0], doubles[1],0,"N/A");
        Location l2 = new Location("B", doubles1[0], doubles1[1],0,"N/A");
        return MathHelper.getHaversineDistance(l1, l2);
    }
}
public static double getHaversineDistance(Location loc1, Location loc2) {
    Double latDistance = toRad(loc2.getLatitude() - loc1.getLatitude());
    Double lonDistance = toRad(loc2.getLongitude() - loc1.getLongitude());
    Double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) +
            Math.cos(toRad(loc1.getLatitude())) * Math.cos(toRad(loc1.getLatitude())) *
                    Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
    Double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
    return R * c;
}

By putting 2 and 2 together, we get a DBSCAN implementation that accepts an ɛ in kilometers.

While choosing the parameters for the DBSCAN algorithm can be tricky, we use 150km / 93mi as the max. Radius of a cluster and assume that a single point is valid for a cluster. While this, in theory, produces a lot of noise, it is an accurate statement for clustering our location set.

For interpreting our clusters (and choosing the one that is seemingly correct), we rely on the average “importance” value of OpenStreetMap, which aggregates multiple important metrics (from e.g., Wikipedia) to “score” a place.

If the user’s location contains part of a state as String (e.g., Springfield, IL), we increase the importance score during execution.

Springfield, data points

 

Springfield, avg. importance by cluster

In our Springfield example, Springfield, IL is the state capital of Illinois – and that is one of the reasons why the OpenStreetMap data ranks it higher than the entire Springfield, MA cluster (which consists of Springfield, MA, Springfield, VT and Springfield, NH – despite their combined population being bigger than Illinois’ state capitol).

Assuming we have multiple points in a cluster, we take the average between all coordinates in it. This is accurate enough in a ~90mi radius and results in the rough center of the (irregular) polygon that is our cluster.

While I’ve just explained that Springfield, IL could be considered an accurate result, in order to illustrate the averaging, we simply remove Springfield, IL from the equation and our “best” cluster looks like this:

Sample Cluster for Springfield, MA

(The red dot in the middle is the average coordinate result)

Finally, we retrofit the calculated location data to a US state. For this, we have 2 options –

  • Call Nominatim again, resulting in another relatively expensive API call
  • Approximate the result by using a local list of rough state boundaries

While both methods have their pros and cons, using a geo provider undoubtedly will produce more accurate results, especially in cities like NYC or Washington DC, were we have to deal with close state borders to any given point.

For the sake of simplicity and resource constraints, I’m using a singleton implementation of a GSON class that reads a list of US states with rough boundaries that I’ve mapped from XML to JSON.

In our case, the result is either New Hampshire or Illinois, depending if we remove Springfield, IL nor not.

Other examples

Now, what tells us that somebody who states they are from “Springfield” simply likes the Simpsons?

Well, nothing. While it is advisable to store multiple potential location results and re-visit that data (or even use a different algorithm with a proper training set based on that), the architecture and algorithms works surprisingly well – some random profiling produced mostly accurate results, despite various input formats:

Sample Tweet locations across the US by quantity

(The big dot in the middle represents the location “USA”)

Original Location Result Accurate
Philadelphia Philadelphia, Philadelphia County, Pennsylvania, United States of America TRUE
Brooklyn, NY BK, Kings County, NYC, New York, 11226, United States of America TRUE
nebraska Nebraska, United States of America TRUE
United States United States of America TRUE
Truckee, CA Truckee, Donner Pass Road, Truckee, Nevada County, California, 96160, United States of America TRUE
Lafayette, LA Lafayette, Tippecanoe County, Indiana, United States of America TRUE
Minot, North Dakota Minot, Ward County, North Dakota, United States of America TRUE
Rocky Mountain hey! Rocky Mountain, Harrisonburg, Rockingham County, Virginia, United States of America TRUE
Living BLUE in Red state AZ! Arizona, United States of America TRUE
Earth Two, Darkest Timeline Earth Tank Number Two, Fire Road 46, Socorro County, New Mexico, United States of America FALSE
The Golden State Golden, Jefferson County, Colorado, United States of America FALSE
Atlanta, GA Atlanta, Fulton County, Georgia, United States of America TRUE
thessaloniki Thessaloniki Jewelry, 31-32, Ditmars Boulevard, Steinway, Queens County, NYC, New York, 11105, United States of America FALSE
newcastle Newcastle, Placer County, California, 95658, United States of America TRUE
Afton, VA Afton, Lincoln County, Wyoming, 83110, United States of America FALSE
Gary, IN / Chicago, IL Chicago, Cook County, Illinois, United States of America TRUE
Canada Canada, Pike County, Kentucky, 41519, United States of America FALSE
Southern California Southern California Institute of Architecture, 960, East 3rd Street, Arts District, Little Tokyo Historic District, LA, Los Angeles County, California, 90013, United States of America TRUE
San Francisco Bay Area San Francisco Bay Area, SF, California, 94017, United States of America TRUE
Southern CA Southern California Institute of Architecture, 960, East 3rd Street, Arts District, Little Tokyo Historic District, LA, Los Angeles County, California, 90013, United States of America TRUE

(All of these results come with latitude and longitude, state data and the full user profile and tweet metadata)

More importantly, tuning those results is just an exercise in careful profiling. We could filter out obvious countries that are not the US, tune the model parameters or the API calls.

Tweet locations across the US by avg. importance

(In this example, big points indicate a high confidence level; often points in the geographical center of a state hint that the user simply said they were from e.g. “Arizona”, “AZ” or “Living BLUE in Red state AZ!”)

Summary

While the example shown here is a simple proof of concept, extending the concept has plenty of opportunities –

  • Fine-tune the model, filtering obvious outliers
  • Build a data model that connects location data with the tweets, all other available metadata
  • Store multiple salience values per analysis, tuning the model based on the data
  • Run the topology on scale and apply it to all tweets concerning a certain topic, analyzing the big picture and calculating for false positives
  • Run regression or other analysis over users entries with the same ID and potential mismatches, tracking changes in the location; write a pipeline that flags users which use their location and retrofit all old results to an accurate GPS
  • Store users without location data and apply the above logic to those

One thing we can conclude is that using a combination of well-known, powerful local Big Data tools in combination with managed, equally powerful Cloud solutions opens the door for a massive variety of new analytics opportunities that required a much higher level of involvement and cost only a few years ago.

The next steps will be to combine this data with the actual location results, create heatmaps, fine-tune the model, and eventually move the whole solution to the Google Cloud Platform.

Sample entity analysis

All development was done under Fedora 27 with 16 AMD Ryzen 1700 vCores @ 3.2Ghz and 32GiB RAM. Nominatim planet data from 2018-03-15 was stored on a 3TB WD RED Raid-1 Array

Software Stack: Heron 0.17.6, Google Cloud Language API 1.14.0, Google Cloud BigTable API 1.2.0, Google Cloud BigQuery API 0.32.0-beta, Google Data Studio, Nominatim 3.1.0, PostgreSQL 9.6.8

Continue Reading

Storm vs. Heron – Part 2 – Why Heron? A developer’s view

This article is part 2 of an upcoming article series, Storm vs. Heron. Follow me on Twitter to make sure you don’t miss the next part!

In the last part of the series, we looked at how to transform your existing Storm topologies to Twitter’s new distributed streaming- and analytics-framework, Heron. In this part of the series, we will actually see why you would want to do this. This part will see Storm from a developer’s view, whereas the next part will focus on operations & maintenance.

Why Storm leaves stuff to be desired

First off, let me start with stating that Apache Storm surely is great and flexible framework, but often doesn’t quite satisfy all the requirements an enterprise-ready Big Data component should fulfil. Its weaknesses show in both architecture and the way an enterprise can handle Storm development, testing, deployment and maintenance.

The things one could objectively criticize about Storm boil down to the following points:

  • Unified resource assumption & performance (Developers)
  • Insufficient resilience features (Developers)
  • Insufficient debugging abilities (Operations)
  • Insufficient monitoring abilities (Operations)

Naturally, these all depend on your businesses’ specific requirements for the use cases you want to build – but generally speaking, these points should affect the majority of users.

Some points might be simplified for the sake of accessibility to the topic.

Unified resource assumption & performance

First, let’s have a look at how Storm structures its minions:

stormworker1
Storm worker architecture

The top-level hierarchy consists of one (< Storm 1.x) or multiple, high-available (>= Storm 1.x) so-called Nimbus servers which coordinate your topologies. These Nimbus servers communicate with your second level, the supervisor nodes executing your code. An optional UI node as well as an optional log viewer process can be deployed as well.

Every supervisor runs multiple JVMs (in so-called workers), which then again execute your actual code fragments in their respective executors. And here’s the actual big first issue with this design – you have absolutely no ability to properly control your resource usage. In your respective topologies, you can freely set different numbers of executor processes for your bolts and spouts, but have no ability to control where your process will end up1. It is possible that a long-running, complex part of your topology shares a JVM with a simple calculation component. It becomes almost impossible to isolate- and profile these tasks separately (I actually once wound up writing a rather complex parser written in R, iterating over JVM-dumps, logs etc. for that task).

Also, if one of the more complex bolts crashes (e.g. an uncaught exception), it takes down completely unrelated tasks with it!

stormworker12png
Storm JVM architecture

Furthermore, all JVMs have a fixed memory setting – depending on the most RAM-intensive component currently running. The resources this JVM holds are not linked with any other Hadoop scheduling component, such as YARN or Mesos, making it almost impossible to use shared nodes (e.g. Storm supervisors + HDFS datanodes) if you care about resource allocation2.

In reality, this basically forces you to think about deploying entirely separate Storm clusters per topology. This may sound drastic, but makes sense if you think about it.

Imagine a topology with two main parts – a spout taking up 10GB of memory and a bolt that needs 5GB to do its job. If you now run 1 spout and 3 bolts, you will effectively need to allocate 30GB, as one of the JVMs will need at least 15GB (one spout and one bolt). This effectively wastes 5GB of memory (1*10+3*5=25). And yes, this is blatantly taken from this paper (which I highly recommend reading).

Now, there’s more “under the hood” stuff that Storm isn’t exactly smart about, for instance…

  • A lack of backpressure
  • Overly harsh load on Zookeeper by filling it with heartbeats and states
  • A lot of useless threads for sending and receiving tuples
  • A lot of GC action
  • Merged log files per worker

Heron

Now, Heron is a lot smarter about basically all of this.

heronarchitecture
Heron Architecture

Heron uses Aurora, a framework atop of Mesos, to schedule its topologies. This solves your dilemma of Storm allocating its resources independently. YARN is also an option for existing setups.

Heron runs user code and its managers (used for coordinating the topology’s components) in a so called container. If you’ve ever worked with YARN / M/R2, you know the general idea.

Within these containers, Heron Instances run the user code in separate JVMs. With this approach, dedicated resource allocation, profiling and debugging is possible. It also enables us to have dedicated logs and access these JVMs in the UI.

A Stream Manager now efficiently manages the tuple communication and backpressure and a Topology Master communicates metadata with Zookeeper.

Metadata is kept in Zookeeper; not all heartbeats and transactions like in Storm.

Performance

Naturally, this has an impact on performance as well. Heron claims to be “up to x14 faster” than Storm. And who am I do doubt them?

Well, personally, I don’t fall for marketing that aims to throw big numbers at me – performance in real-life scenarios is limited by hundreds of factors, for instance I/O implementations and limits, network throughput, API limits and so on and so forth.

But still, running my example topology from Part 1 in single node mode (i.e. locally), without Mesos, with a limited Twitter API and a horrible HDF implementation on a single AWS r3.xlarge (Intel Xeon E5-2670 4 vCores, 30GiB Ram) instance gave me 60% more throughput – which makes me believe Twitter’s “x14” claim (for WordCount) much more.

stormheronperf

So, if you activate Aurora, run it in an actual distributed mode and optimize the known bottlenecks (HDFS, Twitter API), I can see this thing performing much better.

And even if it doesn’t – without Heron, I would barely be able to even think about profiling my bolts, because everything is mushed into an unreadable, nasty array of JVMs.

Heron: 1. Storm: 0.

Insufficient resilience features

Resilience is a term many enterprises love to use and can mean many things. For some businesses, it translates to “stateful processing”, in others to “stateful & exactly-once processing”. Some just refer to the architecture of the framework and cluster setup (what happens if a node breaks down or is not available?). In the end, it means that our topologies need to produce results – data – that the business can use and usually that it manages errors. And you, as an engineer, need to think about how to do that with the framework at hand.

System failures

Storm manages some of these requirements fairly well – as long as you give it enough supervisor machines to play with. If one of these machines stops sending heartbeats via Zookeeper, it stops being available for your topologies. Storm (read: the nimbus) re-allocates the resources to the existing machines. As soon as the machine becomes available again, Storm will make it part of its supervisor network again. And if you use a Storm version >= 1.0.x, you can also remove the SPoF of the nimbus.

Stateful processing with Trident or Storm

Stateful processing, however, is a whole different story. Usually, a tuple is stateless and its lifetime is managed by the acknowledgement mechanism you chose to implement. Let me be blunt and quote from the official documentation:

“There’s two things you have to do as a user to benefit from Storm’s reliability capabilities. First, you need to tell Storm whenever you’re creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm’s API provides a concise way of doing both of these tasks. Specifying a link in the tuple tree is called anchoring” http://storm.apache.org/releases/1.0.2/Guaranteeing-message-processing.html

In other words, you rely on your queue (Kafka, JMS, Kestrel, Twitter…) for managing your resilience. If something fails – and stuff will fail eventually – you need to re-process the entire tuple-tree. Technically speaking, this process is still stateless, but more secure than just hoping your topology won’t mess something up.

To actually achieve stateful processing, i.e. storing information about the progress and status of a tuple, you have two options left: Storm’s new Stateful API or Trident. Trident offers you many great features, but is another layer atop of Storm you will need to manage and understand. See this article from the documentation for further reference.

Storm’s stateful API is pretty new and relies on a KeyValueStore and a persistence layer of your choice (currently, that would boil down to redis). If your worker crashes, Storm can than go ahead, read from said store and re-process the tuples.

All of these mechanisms require you to put quite a lot of thought into your design. If you compare this with the way SparkStreaming3 handles this – by supplying a checkpoint directory on HDFS – you quickly notice why one might be sceptical about Storm’s way of doing things.

me_irl
me_irl

Heron

Now, here’s the deal with Heron – while it aims to improve many things “under the hood” compared to Storm (architecture, performance, monitoring, debugging …), it still is fully compatible to Apache Storm (see Part 1 of this series). While arguably this is a huge benefit for corporations using Storm right now, it does not actually impact the development model too much – in other words, resilience will be still a topic you have to think about yourself. Now, if I’m the one who missed a killer feature that Heron introduced for this, let me know!

Conclusion

So, what can we take away from this as developers? First of all, we need to acknowledge (no pun intended) that Heron is built for using Storm code – with all its up- and downsides. While the basic programming model of Storm is fairly simple, it quickly gets complicated when we need to throw catchphrases like “states”, “WAL” or “exactly once” into the discussion.

However: Just executing the existing code renders many benefits. We get better performance even in extremely simplified setups, we utilize our machine’s resources much better than Storm ever could (saving $$ in the process!) and potentially save ourselves several headaches when it comes to crashing JVMs and other fun-failures you encounter on any Big Data Cluster.

In the next part, the benefits will get even more obvious – when we take a detailed look how you can actually operate, monitor and eventually debug a Storm vs. Heron cluster in production environments.

 

Footnotes:

  1. Not entirely true per se – you can look into things like the isolation scheduler, but that is basically working around a core issue in the software’s design.
  2. Storm on Yarn is possible, for instance via Hortonwork’s Big Data distribution, albeit not out of the box.
  3. I don’t see SparkStreaming as a “holy grail” of frameworks. Yes, it receives more updates, yes, it seems more mature in many ways, but it is far from being an alternative for Storm/Heron/Flink/Samza for any case. But this would require me to rant in another article.
Continue Reading

Storm vs. Heron, Part 1: Reusing a Storm topology for Heron

This article is part 1 of an upcoming article series, Storm vs. Heron. Follow me on Twitter to make sure you don’t miss the next part!

When upgrading your existing Apache Storm topologies to be compatible with Twitter’s newest distributed stream processing engine, Heron, you can just follow the instructions over at Heron’s page, as Heron aims to be fully compatible with existing Storm topologies. But is it really that simple? I’ve tried that, using my very much real topology and was not overly surprised by the result.

Setup

So, here’s my setup:

  • Hadoop 2.7.3 (HDFS + YARN)
  • Apache Storm 1.0.2
  • Twitter Heron 0.14.3
  • MacOS Sierra 10.2

All my dependencies come from using

$ brew install

Topology

First off, the topology I’m using is my Twitter Analysis topology that reads in tweets (filtered by keywords, using twitter4j), runs a couple of analysis queries on these (based on a prototypical word list from pre-defined reference accounts, basically sh*tty, supervised machine learning for non Data Scientists) and persists them into HDFS, using the storm-hdfs library.

I’ll come to the details in a later article; for know, all we need to know is that we use the most recent, stable versions of Storm and an external library that does what Storm does best, stream analysis and simple storage.

The process

What Twitter recommends is simple: Install Heron, remove the following dependency

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>storm-VERSION</version>
  <scope>${scope.provided}</scope>
</dependency>

from your Maven pom.xml and replace it with Heron’s dependencies. Sounds too amazing to be true, considering that updating from Storm 0.x to 1.x required us to do actual code changes! Not to mention the way easier deployment, configuration and under-the-hood changes Heron comes with. But unfortunately, not everything went as smoothly as planned…

Issues

storm-hdfs

As of right now, Heron seems not to be compatible with storm-hdfs (albeit stating otherwise on the project’s GitHub in the past), judging by this error:

Screen Shot 2016-10-15 at 17.00.07
java.lang.NoClassDefFoundError: org/apache/storm/utils/TupleUtils

You have a couple of options for this, I guess. Heron seems to be not able to work with pre-compiled classes containing Storm-objects, so your best shot would be to grab the library and integrate it in your project as regular classes and packages.

The other option is re-writing the library. While that seems like an expensive endeavour, it may prove useful if you require more control or flexibility anyways. As I did this in the past with the storm-hbase library for those exact reasons, I’m pretty sure that this is far from a viable option for everyone, but surely it will work for some.

Considering the sheer amount of external libraries for Storm (kafka, hdfs, hbase, mqtt, hive, jms, redis …), this could turn out to be a real problem, though. So, if you know of a smarter alternative, let me know!

Update: Thanks to the Heron team over at Twitter, the external systems for Heron are work in progress!

twitter4j

Also, twitter4j did me not leave exactly happy. Whilst my simple spout worked, it was not able to emit the twitter4j Status interface, representing a tweet, as it was usually coming in in the form of a twitter4j.StatusJSONImpl.class, not visible from outside the twitter4j package. As Heron uses kryo for this, I was not able to register it with the Heron configuration:

conf.registerSerialization(twitter4j.StatusJSONImpl.class);

The solution to this was fairly simple, after the Reflections library failed me as well: I used a custom wrapper object, emulating the Status interface and hence remaining compatible with the rest of my code. Not pretty, but works – and required me to touch code after all. Remember to register these classes as well!

The asm library

That one was tricky – after fixing the previous errors, I was left with this:

asm heron
IncompatibleClassChangeError: Found interface org.objectweb.asm.MethodVisitor, but class was expected

As it turns out, the method call you see there used to be a interface call in asm 3.x, but was switched to a class in 4.x+. Adding this to my pom.xml fixed that one as well:

<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>4.0</version>
</dependency>

Conclusion

Heron surely is awesome when we compare it to Storm – at least in theory. While the idea of literally re-using your entire source with a new, better technology seems thrilling, reality looked a bit less exiting. Anyways, after a couple of hours debugging and using fixes that maybe could be done a bit less brutally, it is now running beautifully on my local machine. Time to test it on a cluster!

Heron UI
Heron UI

Disclaimer

Heron is in development and things change quickly. These issues might not be true for your setup and there might be a perfectly viable solution to all of this – but I wasn’t able to find it and hence decided to document my experiences here.

Continue Reading