一个排序的有界的并发Buffer队列Java实现

15-12-30 banq
         

Java中使用TreeSet作为排序队列或Buffer,而ConcurrentSkipListSet是支持并发的队列,如果我们需要一个能实时排序又支持并发的队列或Buffer怎么办?

设想一个场景,当我们不断加入元素到集合,等所有元素都加入完毕后,我们只需要获得排序的前5个元素,比如我们对半年内所有帖子或文章根据点赞量获得前五篇点赞量最多的文章,首先我们需要将半年所有文章加入一个集合,这个集合是根据点赞量排序的,添加完毕后,我们需要获得排名前5篇文章。

这可以通过JDK提供的 NavigableSet实现:

NavigableSet<Integer> aSet = new TreeSet<>();
    aSet.add(5);
    //..add more elements
    aSet.pollFirst();//'0'th
    aSet.pollFirst();//'1'th
    aSet = aSet.descendingSet();
    aSet.pollFirst();//'n'th
    aSet.pollFirst();//'n-1'th
<p>

但是这种集合是无界的,它需要保留所有元素,其实有时这是没有必要的,因为我们只感兴趣所有元素中子集的子集,也就是说没有必要从数据库中获得所有结果。

有没有更好的数据结构能实现有界而且像以前能够排序呢?实际是某种BoundedNavigableSet. 而且需要支持并发访问,多个生产者可以同时将元素放入其中。

这种数据结构能够在分布式查询聚合实现中非常方便,特别是对分布式key-value数据库如Cassandra, 基于多个分区查询获得的结果需要收集融合在一起。

下面是使用AtomicReferenceArray实现的无锁算法插入:

/*
Scan the array from given offset to 'insert' the item
at a proper sort level
*/
boolean addItem(int fromOffset, T item)
{
    for (int i = fromOffset; i < size(); i++)
    {
      // if there is no element at 'i'th position
      // set item at 'i'
      if (!buffer.compareAndSet(i, null, item))
      {
        // compare and swap using Comparator provided
        // or, if element implements Comparable
        T swapped = compareAndSwap(i, item);
        if (swapped != null)
        {
          // the item has been placed. so break. but then
          // the element currently at 'i' has been swapped. so find its new
          // position, if present
          // we could have scanned from the 'i+1'th position, but to be safe
          // just in case some other element higher up was removed and this
          // needs to go up in that case
          if (i + 1 < size()) {
            addItem(0, swapped);
          }
          return true;
        }
      }
      else {
        return true;
      }
    }
    return false;
  }
<p>

Compare 和 swap也是一个原子操作:

private T compareAndSwap(int i, T item)
  {
    boolean set = false, greater = false;
    T t = null;
    while (!set) {
      t = buffer.get(i);
      // either i-th element was replaced with this item
      // or by some other element
  // compare using Comparator/Comparable
      greater = compare(item, t) > 0;
      set = buffer.compareAndSet(i, t, greater ? item : t);
    }
    return greater ? t : null;
  }
<p>

removal操作也应该是原子的:

public boolean remove(Object o) {
    T b;
    if(o == null)
      return false;
    for (int i = 0; i < size(); i++) {
      b = buffer.get(i);
      if (o.equals(b))
      {
        if(buffer.compareAndSet(i, b, null))
        {
          //shift left elements
          for (int j = i + i; j < size(); j++) {
            //check position form start. it can be possible that another higher item has been removed in the meantime
            addItem(0, buffer.get(j));
          }
          return true;
        }
      }
    }
    return false;
  }
<p>

完整源码见:Github

[该贴被banq于2015-12-30 11:25修改过]