Stream Join
- Dealing with unbounded, “infinity”, input. The output must be produced in real time.
- time-base (a expired time for tuples) v.s. count-based (fixed number of tuples)
- meeting multi-core => parallel
Kang’s three steps join algorithm
- assuming tuple r arrives from input stream R:
- Scan stream S’s window to find tuple matching r.
- Insert new tuple r into window for stream R.
- Invalidate all expired tuples in stream R’s window.
Stream Join RoadMap
|
|
Cell Join
- For heterogeneous architecture
Handshake Join
- Apply classical three-step procedure in small window.
- Point-to-point links between processing core.
- Missed-join pair problem => Two-phase forwarding (asymmetric core synchronization protocol): Whenever the right core
C_right
places a tuples
into its left send queue, it still keep a copy ofs
in the local window but marked it asforward
. The forwarded tuple remains available for joining onC_right
until the second phase of tuple forwarding, which is initiated by anack
message fromC_left
Autonomic load balancing =>
123if R-window is larger than that of our right neighborplace oldest ri into rightSendQueue ;mark ri as forwarded ;
Low latency handshake join
- observation: higher latency due to that tuples may have to queue for longer periods of time before they encounter matching partners.
Split Join
- Different work-flow: Top-down data flow
- Don’t need to rely on central coordinator
- A distributed network (Distributor), a set of independent join cores (JoinCore/JC), a result gathering network (Combiner/Merger)
- Overall:
- Distributed Tree: K-ary Tree
- Parallelism: Sub-window
- Join core: left-region -> storing Tuple
r
in sliding window; right-region -> processing of the replicated copy of Tupler
- Case: A new coming Tuple
r
, comparer
with all the tuples in theS
stream sub-windows. Then, assigningr
to a storage based on around-robin
selection.
- Join core: left-region -> storing Tuple
Expired Policy:
Time-based: a lifespan
l
12345Expiration Process(t, sub-window X) begini = the end of sub-window X;while ti.timestamp - t.timestamp > Time Window Size doomit ti from sub-window X;i = i−1;Count-based: every sub-window is fixed size
Particular join algorithm:
123456Processing Core(t, sub-window X) beginforall ti-tuple in sub-window X docompare ti-tuple with t; if match thenemit the matched result;if i = 0 (mod ordering precision) thenemit punctuation star;RAP (relaxed adjustable punctuation) strategy
- Punctuation-based ordering
- outer ordering (strict/relax)
- inner ordering (the result of a new inserted tuple was in time order)
- Employing a combiner rather than a merger
- Tuning to produce a punctuation after joining one newly inserted tuple (strict outer ordering) or after
N
tuples (relax outer ordering) - Data-flow when merging123456N-ary Merger(t) beginforeach right(or left)-region of core1..N in sequence dowhile a resulting tuple (t) is available in output buffer till the first star dopop t from join core’s output buffer;push t to Merger’s output buffer;push out the end of result star;