Data Stream Processing

I've spent the past few weeks reading about data stream processing. The underlying problem that research in data stream processing tries to address is given a stream of data and a standing query but not enough space to store all the data (or not enough processing power to rescan all the data each time the query is evaluate), how do we effectively evaluate the query?

Consider the problem of determining how frequently an event occurs in the recent past. The recent past is relative. Thus, it is not enough to simply increment a counter when the corresponding event occurred, we also need to decrement the counter as recorded events leave the recent past. The naïve solution is to record the time at which each event occurred. The amount of space required is proportional to the number of events. If there are a lot of events, e.g., every use of a file, this adds up to a lot of space. Moreover, processing the query will be expensive. The space requirement is typically expressed as the amount of buffer space be sub-linear with respect to the data that is relevant to the query. Meeting this requirement is the domain of data stream processing.

Limiting space often requires accepting approximate answers: computing exact answers sometimes requires more state than is available. Often, however, exact answer are not required; a good approximation includes the degree of error. Data stream processing typically tries to bound relative error. Relative error is the difference between the reported answer and the true answer. Thus, an acceptable relative error of at most 50% allows reporting any value between 5 and 15 if the correct value is 10 and any value between 500 and 1500 if the correct value is 1000. Another way to think about relative error is that the first few decimal places are correct and the rest is a (good) guess.

Introductory Literature

There are a lot of papers that have been published on data stream processing in the last dozen years. I found that "Issues in Data Stream Management--A Survey" by Golab and Özsu (2003) to be a useful survey of the issues being addressed in data stream processing community (note the SIGMOD Record paper is a shorter, abbreviated version). A really nice, concise text book on data streams is "Data Streams: Algorithms and Applications" by S. Muthu Muthukrishnan. The model that I've become rather interested in is the sliding window model, in which the stream is infinite but only the last N elements or time units are relevant to the query. This model is explained well in "STREAM: The Stanford Data Stream Management System" by Arasu et al.

My Motivation: Capturing Files' Histories

The example of tracking all accesses to all files is what motivated me to take a deeper look at data streaming processing. I'm currently working on a project (variously known as Woodchuck, Smart Storage and NetCzar) to enable aggressive prefetching of network data, particularly in the context of mobile devices, such as e-mails, RSS feeds, Podcasts, and place-shifted video. The major challenges of this project include: scheduling the downloads, determining how much space each application should use, and identifying data that can be deleted. The former two issues are topics for another post. As for determining what can be deleted: each application could implement this logic, however, that's a fair amount of work. We're investigating how effectively we can centralize this.

To determine what should be deleted, we need to understand when the file no longer has value or only has marginal value. For a Podcast episode, this might be once it has been listened. For news-like content, this is may be once an episode been superseded by a new episode. In most cases, understanding the access history of the file as well as related files appears essential.

My goal is to succinctly capture the access history of all relevant files. Conceptually, we can think of file accesses as access events. Associated with each file is a stream of access events. As implied by this article, (we think) saving all accesses is too expensive and we'd like a compact data structure that summarizes the history of the file. We'd like to be able to ask questions of the sort: how recently was a particular file last accessed? How frequently was the file used in the last month? in the last year?

Counting Data Structures: Exponential Histograms & Waves

After reviewing several dozen papers, a score or so in depth, I identified two data structures that appear to enable us to answer these recency and frequency queries: exponential histograms (from "Maintaining Stream Statistics Over Sliding Windows" by Datar et al.) and waves (from "Distributed Streams Algorithms for Sliding Windows" by Gibbons and Tirthapura). Both of these data structures are used to solve the so-called counting problem, the problem of determining, with a bound on the relative error, the number of 1s in the last N units of time. In other words, the data structures are able to answer the question: how many 1s appeared in the last n units of time within a factor of Error (e.g., 50%). The algorithms are neat, so I'll present them briefly. See the linked papers for more details and proofs of correctness.

The exponential histogram data structure is a histogram in which buckets recording older data are exponentially wider than the buckets recording more recent data. As a first approximation, the first bucket records when the first 1 occurred, the second bucket when the first of the next two 1s occurred, the third bucket when the first of the next four 1s occurred, etc. Consider the following data structure and illustration:

  struct bucket
  {
    /* The time the most recent 1 that this bucket records was seen.  */
    time_t start;
    /* The number of 1s this bucket records.  */
    int ones;
  };

  Buckets & Time:      |1|3  |23     |27             |
                                          1 1 1 1 1 1
  One:                  1 2 3 4 5 6 7 8 9 0 1 2 3 4 5

Note that the width of the bucket is in terms of 1s, not time. In this example, the second bucket records (23 - 2) = 20 seconds of history and 2 ones, and the third bucket records 27 - 23 = 4 seconds of history yet only 4 references.

When we execute the query (the number of 1s seen in the last n units of time), we simply iterate over the buckets starting with the bucket containing the most recently recorded 1, and find the bucket that covers the time we are interested in. All of the previous buckets certainly recorded 1s that occurred during the time period in question. For the last bucket, we need to interpolate since we only know when the first 1 happened. Call the bucket we need to interpolate on bucket i. The minimum number of 1s that could have been seen is thus sum (buckets[0 <= j < i]) + 1: all the preceding references plus at least the first reference from bucket i. The maximum number of 1s that could have been seen is sum (buckets[0 <= j <= i]). Choosing the mid-point thus ensures a maximum relative error of 50% as sum (buckets[0 <= j < i]) + 1 == sum (buckets[0 <= j <= i]). Assume that i is 3, then we can visualize what is happening as follows:

  1 2 4 8
  -----
    7 

Improving the relative error is straightforward: for each bucket width, we maintain more buckets. In particular, we need C=ceil(1/(2*Error)) + 1 buckets of each width. (Note that this equation says that we need 2 buckets per level for 50% error even though one is only required.)

The only remaining issue is how to update buckets. Consider the above figure: if we add a bucket, we now have two buckets with width one. We can't adjust the buckets' boundaries as don't know when, e.g., event 3 happened. To work around this, we allow either C or C+1 buckets per width. When there are C+2 buckets of the same size, we coalesce the two oldest in the level.

This data structure requires O(log^2(Error * N) / Error) bits, where N is the amount of time to cover: there are log (Error * N) buckets and each bucket needs to record log (N) bits for the time stamp and the number of 1s, respective. A slight space optimization is possible: we only need one bit to record the number of 1s per bucket. Assuming the buckets are ordered by age, we only need to indicate if the bucket in question records the same number of 1s as the previous bucket or twice as many.

The exponential histograms requires a O(log(N)) update due to the requirement to coalesce buckets. The amortized cost, however, is just O(1). The waves data structure improves on this while having the same memory requirement: recording a new 1 requires just O(1) time.

The wave data structure removes the requirement to merge buckets. It records the time at which critical 1s occurred, namely, the 1/Error+1 most recent 1s which are a multiple of 2^i, 0 <= i <= log(2*Error*N). The figure below illustrates the idea. See the paper for more details.

                                                         2   2
                                                         5   7
  2^0:                                                 | | | |
  2^1:                                         |   |   |   |
  2^2:                         |       |       |   22  |   26
  2^3: |               |       12      |       20      |
       0               8               16              24

These data structures can be used to the number of file accesses: file accesses are just 1s! A possible issue (for my purposes) with these two data structures is that the bounded relative error only applies to the query: how many 1s occurred in the last n units of time? If we want to know approximately when the ith 1 occurred, the error is potentially unbounded. Consider again the exponential histogram example presented above. The third 1 may have occurred any time between time 4 and time 22, inclusive. We could "flip the axis" (buckets are separated according to time and not 1s and they record references not time) to answer this query, however, we then have a similar problem when trying to determine the number of references that occurred in the last n units of time. This may not be a real issue as determining when a particular reference occurred (as opposed to the number of references) is primarily of interest for the most recent references, which the data structure does maintain.