并发主题

Hazelcast入门教程

  Hazelcast ( www.hazelcast.com)是一种内存数据网格 in-memory data grid,提供Java程序员关键任务交易和万亿级内存应用。

Hazelcast的集群属于“无主节点”,这意味着它不是一个客户端 - 服务器系统。有一个集群的领导者,默认是最老的成员,管理数据是如何在系统间分布,但是,如果该节点当机,那么是下面一个旧的节点接管。

你所用的数据结构Maps List和队列都是保存在内存中。如果集群中的一个节点死亡,数据不会丢失,但如果多个节点同时当机,那么你就麻烦了。

下面以BigWideWorl为案例说明Hazelcast的使用,首先依赖包导入:

<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>3.1</version>
</dependency>

public class BigWideWorld { 
      
  private static Random rand = new Random(System.currentTimeMillis()); 
      
  private final Users users = new Users(); 
      
  private final int totalNumUsers = users.size(); 
      
  public String nextUser() { 
      
    User user = users.get(rand.nextInt(totalNumUsers)); 
    String name = user.getUsername();       
    return name;       
  }       
}

nextUser()方法可以随机获得一个用户的名称,用户集合由Users类管理。

public class Users {

  /** The users in the database */
  private final User[] users = { new User("fred123", "Fred", "Jones", "fredj@a.com"),
      new User("jim", "Jim", "Jones", "jimj@a.com"),
      new User("bill", "Bill", "Jones", "bill@a.com"),
      new User("ted111", "Edward", "Jones", "tedj@a.com"),
      new User("annie", "Annette", "Jones", "annj@a.com"),
      new User("lucy", "Lucy", "Jones", "lucyj@a.com"),
      new User("jimj", "James", "Jones", "jimj@a.com"),
      new User("jez", "Jerry", "Jones", "fredj@a.com"),
      new User("will", "William", "Jones", "willj@a.com"),
      new User("shaz", "Sharon", "Jones", "shazj@a.com"),
      new User("paula", "Paula", "Jones", "pauj@a.com"),
      new User("leo", "Leonardo", "Jones", "leoj@a.com"), };

  private final Map<String, User> userMap;

  public Users() {

    userMap = new HashMap<String, User>();

    for (User user : users) {
      userMap.put(user.getUsername(), user);
    }
  }

  /**
   * The number of users in the database
   */
  public int size() {
    return userMap.size();
  }

  /**
   * Given a number, return the user
   */
  public User get(int index) {
    return users[index];
  }

  /**
   * Given the user's name return the User details
   */
  public User get(String username) {
    return userMap.get(username);
  }

  /**
   * Return the user names.
   */
  public Set<String> getUserNames() {
    return userMap.keySet();
  }
}

Users其实类似一个Map集合。下面是用户类:

public class User implements Serializable {

  private static final long serialVersionUID = 1L;
  private final String username;
  private final String firstName;
  private final String lastName;
  private final String email;

  public User(String username, String firstName, String lastName, String email) {
    super();
    this.username = username;
    this.firstName = firstName;
    this.lastName = lastName;
    this.email = email;
  }

  public String getUsername() {
    return username;
  }

  public String getFirstName() {
    return firstName;
  }

  public String getLastName() {
    return lastName;
  }

  public String getEmail() {
    return email;
  }

  @Override
  public String toString() {

    StringBuilder sb = new StringBuilder("User: ");
    sb.append(username);
    sb.append(" ");
    sb.append(firstName);
    sb.append(" ");
    sb.append(lastName);
    sb.append(" ");
    sb.append(email);

    return sb.toString();
  }
}

用户User必须实现序列化接口。

客户端调用代码如下:

public class Main {

  public static void main(String[] args) throws InterruptedException {

    BigWideWorld theWorld = new BigWideWorld();

    MyApplication application = new MyApplication();

    while (true) {

      String username = theWorld.nextUser();

      if (application.isLoggedOn(username)) {
        application.logout(username);
      } else {
        application.logon(username);
      }

      application.displayUsers();
      TimeUnit.SECONDS.sleep(2);
    }
  }

}

此代码创建BigWideWorld和所有MyApplication的实例。然后,它无限循环抓住抓取下一个随机的用户名。如果是已经登录的用户,那么注销该用户。如果没有登录的用户,然后登录用户后显示。

运行该代码:

java -cp /your path to the/hazelcast-3.1/lib/hazelcast-1.jar:. com.captaindebug.hazelcast.gettingstarted.Main

得到如下结果:

Logged on users:
User: fred123 Fred Jones fredj@a.com
User: jimj James Jones jimj@a.com
User: shaz Sharon Jones shazj@a.com
User: paula Paula Jones pauj@a.com
User: lucy Lucy Jones lucyj@a.com
User: jez Jerry Jones fredj@a.com
User: jim Jim Jones jimj@a.com
7 -- 14:54:16-17

可以多开几个终端运行这个代码。

你会看到用户不断在登录推出,用户Map集合每次显示出改变,关键是:一个应用的Map大小变化会影响其他窗口应用内的大小,好像大家共用一个Users的Map集合。

 

发布者和订阅者实现

假设有一个模型:

public class StockPrice implements Serializable {

  private static final long serialVersionUID = 1L;

  private final BigDecimal bid;

  private final BigDecimal ask;

  private final String code;

  private final String description;

  private final long timestamp;

  /**
   * Create a StockPrice for the given stock at a given moment
   */
  public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,
      long timestamp) {
    super();
    this.bid = bid;
    this.ask = ask;
    this.code = code;
    this.description = description;
    this.timestamp = timestamp;
  }

  public BigDecimal getBid() {
    return bid;
  }

  public BigDecimal getAsk() {
    return ask;
  }

  public String getCode() {
    return code;
  }

  public String getDescription() {
    return description;
  }

  public long getTimestamp() {
    return timestamp;
  }

  @Override
  public String toString() {

    StringBuilder sb = new StringBuilder("Stock - ");
    sb.append(code);
    sb.append(" - ");
    sb.append(description);
    sb.append(" - ");
    sb.append(description);
    sb.append(" - Bid: ");
    sb.append(bid);
    sb.append(" - Ask: ");
    sb.append(ask);
    sb.append(" - ");
    SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
    sb.append(df.format(new Date(timestamp)));
    return sb.toString();
  }
}

要求将股票的买入卖出价格在任何时间发布给做市商。

发布者代码:

public class MarketMaker implements Runnable {

  private static Random random = new Random();

  private final String stockCode;

  private final String description;

  private final ITopic<StockPrice> topic;

  private volatile boolean running;

  public MarketMaker(String topicName, String stockCode, String description) {
    this.stockCode = stockCode;
    this.description = description;
    this.topic = createTopic(topicName);
    running = true;
  }

  @VisibleForTesting
  ITopic<StockPrice> createTopic(String topicName) {
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    return hzInstance.getTopic(topicName);
  }

  public void publishPrices() {

    Thread thread = new Thread(this);
    thread.start();
  }

  @Override
  public void run() {

    do {
      publish();
      sleep();
    } while (running);
  }

  private void publish() {

    StockPrice price = createStockPrice();
    System.out.println(price.toString());
    topic.publish(price);
  }

  @VisibleForTesting
  StockPrice createStockPrice() {

    double price = createPrice();
    DecimalFormat df = new DecimalFormat("#.##");

    BigDecimal bid = new BigDecimal(df.format(price - variance(price)));
    BigDecimal ask = new BigDecimal(df.format(price + variance(price)));

    StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description,
        System.currentTimeMillis());
    return stockPrice;
  }

  private double createPrice() {

    int val = random.nextInt(2010 - 1520) + 1520;
    double retVal = (double) val / 100;
    return retVal;
  }

  private double variance(double price) {
    return (price * 0.01);
  }

  private void sleep() {
    try {
      TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void stop() {
    running = false;
  }

  public static void main(String[] args) throws InterruptedException {

    MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom");
    MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys");
    MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium");

    bt.publishPrices();
    cbry.publishPrices();
    bp.publishPrices();

  }

}

其中代码关键是:

  ITopic<StockPrice> createTopic(String topicName) {
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    return hzInstance.getTopic(topicName);
  }

这是设置Hazelcast,创建一个主题topic用于股票发布。真正发布是在run方法中的topic.publish(price);

订阅者代码如下:

public class Client implements MessageListener<StockPrice> {

  public Client(String topicName) {
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
    topic.addMessageListener(this);
  }

  /**
   * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
   */
  @Override
  public void onMessage(Message<StockPrice> arg0) {
    System.out.println("Received: " + arg0.getMessageObject().toString());
  }

  public static void main(String[] args) {

    new Client("STOCKS");
  }

}

下面是运行,需要配合两个包:hazel cast-3.1.jar and guava-13.0.1.jar.

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

下面是订阅者运行:

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client


 

本案例源码下载 github

数据网格

java多线程

Java同步或锁

Java性能调优