Note: This post is one of a series, the overview can be found here: Complex Event Processing with StreamInsight
Writing the StreamInsight Queries
The sample code to the application can be downloaded here:
Scenario 1: Tweets per Second
As we can see from our pass-through implementation from the last post, we receive a large amount of tweets. In the first query we would like to find out how many tweets we receive per second. This turns out to be pretty easy!
We define a tumbling window with a length of one second and just call the count property. StreamInsight uses three different types of windows:
- TumblingWindow: A fixed length window. The next window begins when the current one ends.
- HoppingWindow: A fixed length overlapping window. We can define the time interval in which we want the next window. Example: We have a 3 second window that moves in one second hops. So we get updated data every second for the last 3 seconds.
- SlidingWindow: The sliding window reacts to the input stream and always moves when there is a change in the stream. The window size is defined by two adjacent events.
In order for the example to work, we have to do two more things. First, we need to adjust the consoleObserver. Since the query produces scalar count values (and not TweetItems anymore) we need to adjust the generic type parameter in the DefineObserver() method from TweetItem to long.
Next, we need to replace the twitterstream instance with the query for the Bind() call. We now have a query that operates on the data source and bind the observer to the query.
That’s it! The output should look somewhat like this:
Amazing stuff already, but wait, there’s more!
Tweets per Second grouped by Language
For the next query we are going to group the tweets by language. We first need a data object for the computed output. As a little twist we add a method call that resolves the culture info that we find in the Tweet.
Then we continue and write our query. Remember that we have to adjust the generic type parameter of the observer to use the new LanguageSummary type:
The query leads to output like this:
Nice.
Language with most Tweets per Second
Next, we improve the last query by adding another query that uses the output of the last example as input. We want to know the top 5 languages in terms of tweets per second. We use the SnapShot window here to react in changes in the stream, that is changes in the output of the last query.
The most popular Tweet every Second
Next, we want to find the most popular Tweet every three seconds. First, we create a new data class:
Then we write a query that groups all the tweets in every 3 second window by the number of followers that the user has:
Note: The numbers after the user name show (Followers/Friends).
In the next step we improve the last example by adding the Friendcount. We want to know the person in every three second window that has most followers AND most friends. We implement this by writing two different queries for followers and friends and then join them together using a StreamInsight join operation.
Let’s start by writing the most friend query first. It is similar to the followers query:
Now we use the join to join the queries on the user name:
And that is it. If we run the sample we find out who the most popular person on twitter is ever three seconds:
An interesting observation that we can make looking at the data is that in some windows, e.g. 06:21:57, there is no event. This means that at that point the person with most tweets and most friends was not the same person.
The join is a very powerful operation that can give us deep insight into the data.
If you want to play around and try out more complex queries, look at the resources in the first blog post of the series, especially at the LINQPad Samples and the Hitchhiker guide.
I hope you enjoyed this series on Microsoft StreamInsight. If it provided substantial value to you, feel free to donate. 🙂