Source: recipes/oneForAll.js

import { Observable } from 'rxjs';
import { observeRemoveNode } from '../util';

/**
* Generates an observable for an all for one action
* @param {Object} options
* @param {Object} options.client - The node-zookeeper-client instance
* @param {Object} options.path - The path on which to get the lock
* @param {string} options.clientNodePrefix - The client node prefix for this action
* @param {function} options.action
* @param {function} options.testActionDone
*/
export function observeOneForAllAction(options) {
  const { client, path, clientNodePrefix, action, testActionDone } = options;
  return this.observeSeekClientNodeState({
    client,
    path,
    clientNodePrefix,
    repeat: async (nodes) => {
      const done = await Promise.resolve(testActionDone());
      if (done) {
        return Observable.of(false);
      }
      return observeRemoveNode(client, nodes[0]).map(() => true);
    },
  }).distinctUntilChanged()
    .flatMap((isLeader) => {
      if (isLeader) {
        action();
      }
    });
}