Source: client.js

import { Observable } from 'rxjs';

import {
  generateClientId,
  getClientNodePrefix,
  observeDelay,
  makeRetryable,
  observeClientState,
} from './util';

import { observeExclusiveLock } from './recipes/lock';
// import { observeOneForAllAction } from './recipes/oneForAll';

/**
* The client object for recipes
*/
export default class RecipesClient {

  /**
  * constructor
  * @param {Object} options
  * @param {function} options.clientFactory - Creates a node-zookeeper-client
  * @param {Object} [options.retryOptions={}] - The retry options
  */
  constructor(options = {}) {
    const { retryOptions = {}, clientFactory } = options;

    // this observable produces retry delays per the passed-in retry options
    this.retryDelay$ = observeDelay(retryOptions);

    // this observable emits clients and connected/readonly status. nonrecoverable
    // states are handled by creating new clients that are then emitted into
    // the stream. this is a hot observable that will replay the last value to
    // new subscribers. this stream only terminates when the retries are exhausted.
    this.clientState$ = makeRetryable(observeClientState(clientFactory), this.retryDelay$)
      .publishReplay(1);
  }

  /**
  * Connects the observable
  */
  connect() {
    this.clientStateSubscription = this.clientState$.connect();
  }

  /**
  * A helper function that observes a client-based state that is only valid
  * when the client is connected.
  * @param {function} connectedSelector - called to flat map clients to states
  * @param {function} disconnectedValue - the value to be returned when the client
  * is disconnected
  */
  observeConnectedState(connectedSelector, disconnectedValue = false) {
    return this.clientState$.flatMap(({ client, connected }) => {
      if (connected) {
        return connectedSelector(client);
      }
      return Observable.of(disconnectedValue);
    });
  }

  /**
  * Attempts to acquire a lock and the given path and observes the lock state
  * @param {string} path - The path at which to observe
  * @param {string} [clientId] - The client id to use, one is allocated if not supplied
  */
  observeExclusiveLock(path, clientId = null) {
    const clientNodePrefix = getClientNodePrefix(clientId || generateClientId());
    return this.observeConnectedState(
      client => observeExclusiveLock({ client, path, clientNodePrefix }),
    );
  }

  /**
  * Returns promise that resolves when exclusive lock is acquired
  * @param {string} path - The path at which to observe
  * @param {string} [clientId] - The client id to use, one is allocated if not supplied
  */
  getExclusiveLock(path, clientId = null) {
    return this.observeExclusiveLock({ path, clientId })
      .first(hasLock => hasLock)
      .toPromise();
  }

  /**
  * Unsubscribes
  */
  unsubscribe() {
    if (this.clientStateSubscription) {
      this.clientStateSubscription.unsubscribe();
      this.clientStateSubscription = null;
    }
  }

}