Start Stream Join from SoccerJoin to SplitJoin

Stream Join

  • Dealing with unbounded, “infinity”, input. The output must be produced in real time.
  • time-base (a expired time for tuples) v.s. count-based (fixed number of tuples)
  • meeting multi-core => parallel
    figure

Kang’s three steps join algorithm

  • assuming tuple r arrives from input stream R:
  1. Scan stream S’s window to find tuple matching r.
  2. Insert new tuple r into window for stream R.
  3. Invalidate all expired tuples in stream R’s window.

Stream Join RoadMap

1
2
3
4
5
Kang's algorithm (Naive)
=> Cell Join
=> Soccer (HandShake) Join
=> Low Latency HandShake Join
=> Split Join

Cell Join

  • For heterogeneous architecture

Handshake Join

  • Apply classical three-step procedure in small window.
  • Point-to-point links between processing core.
  • Missed-join pair problem => Two-phase forwarding (asymmetric core synchronization protocol): Whenever the right core C_right places a tuple s into its left send queue, it still keep a copy of s in the local window but marked it as forward. The forwarded tuple remains available for joining on C_right until the second phase of tuple forwarding, which is initiated by an ack message from C_left
  • Autonomic load balancing =>

    1
    2
    3
    if R-window is larger than that of our right neighbor
    place oldest ri into rightSendQueue ;
    mark ri as forwarded ;

    figure
    figure

Low latency handshake join

  • observation: higher latency due to that tuples may have to queue for longer periods of time before they encounter matching partners.

Split Join

  • Different work-flow: Top-down data flow
    figure
  • Don’t need to rely on central coordinator
  • A distributed network (Distributor), a set of independent join cores (JoinCore/JC), a result gathering network (Combiner/Merger)
  • Overall:
    figure
  1. Distributed Tree: K-ary Tree
  2. Parallelism: Sub-window
    • Join core: left-region -> storing Tuple r in sliding window; right-region -> processing of the replicated copy of Tuple r
    • Case: A new coming Tuple r, compare r with all the tuples in the S stream sub-windows. Then, assigning r to a storage based on a round-robin selection.
  3. Expired Policy:

    • Time-based: a lifespan l

      1
      2
      3
      4
      5
      Expiration Process(t, sub-window X) begin
      i = the end of sub-window X;
      while ti.timestamp - t.timestamp > Time Window Size do
      omit ti from sub-window X;
      i = i−1;
    • Count-based: every sub-window is fixed size

  4. Particular join algorithm:

    1
    2
    3
    4
    5
    6
    Processing Core(t, sub-window X) begin
    forall ti-tuple in sub-window X do
    compare ti-tuple with t; if match then
    emit the matched result;
    if i = 0 (mod ordering precision) then
    emit punctuation star;
  5. RAP (relaxed adjustable punctuation) strategy

    • Punctuation-based ordering
    • outer ordering (strict/relax)
    • inner ordering (the result of a new inserted tuple was in time order)
    • Employing a combiner rather than a merger
    • Tuning to produce a punctuation after joining one newly inserted tuple (strict outer ordering) or after N tuples (relax outer ordering)
    • Data-flow when merging
      figure
      1
      2
      3
      4
      5
      6
      N-ary Merger(t) begin
      foreach right(or left)-region of core1..N in sequence do
      while a resulting tuple (t) is available in output buffer till the first star do
      pop t from join core’s output buffer;
      push t to Merger’s output buffer;
      push out the end of result star;