Source: util.js

import EventEmitter from 'events';
import pify from 'pify';
import uuid from 'uuid';
import { Observable } from 'rxjs';
import { State, CreateMode } from 'node-zookeeper-client';

import { ExpiredError, AuthenticationFailedError, InvalidStateError } from './errors';

/**
* Constant for event name to avoid strings everywhere
*/
const changeEventName = 'change';

/**
* Creates an observable of delay values for retries given the provided retry options
* @param {Object} options
* @param {number} [options.initialDelay=500] - The initial retry delay
* @param {number} [options.delayFactor=2] - The factor by which to increase each delay
* @param {number} [options.maxDelay=8000] - The max delay value
* @param {number} [options.maxRetries=7] - The max number of retries
* @returns {Observable}
*/
export function observeDelay(options = {}) {
  const { initialDelay = 500, delayFactor = 2, maxDelay = 8000, maxRetries = 7 } = options;
  return Observable.generate(
    [0, initialDelay],
    ([index]) => index < maxRetries,
    ([index, delay]) => [index + 1, Math.min(delay * delayFactor, maxDelay)],
    ([, delay]) => delay,
  );
}

/**
* Makes an observable retryable with the provided delays.
* @param {Observable} obs$ - The input observable
* @param {Observable} delay$ - The delays to use
* @param {function} [retryPredicate] - Determines if the retry should occur
* based on the supplied error object. If not supplied, all errors are retried.
* @returns {Observable}
*/
export function makeRetryable(obs$, delay$, retryPredicate = null) {
  // retry on error as long as retryPredicate returns true and we have retries left.
  // tack one extra null onto the delay observable; when we encounter that, we know
  // we're out of retries and so should throw the error
  return obs$.retryWhen(error$ =>
    Observable.zip(error$, delay$.concat(Observable.of(null)))
      .flatMap(([error, delay]) => {
        if ((delay !== null) && (!retryPredicate || retryPredicate(error))) {
          return Observable.of(null).delay(delay);
        }
        return Observable.throw(error);
      }),
    );
}

/**
* A generic state seeking algorithm that takes the supplied observable
* and returns its state, optionally transformed, as the output. This repeats
* indefinitely after the wait observable completes on each cycle, so that observable
* should emit when the state should be reevaluated and issued by the main observable.
* @param {Object} options
* @param {Observable} options.state$
* @param {function} options.repeat
* @param {function} [options.select]
*/
export function observeSeekState(options) {
  const { state$, select, repeat } = options;

  // track these values to use when repeating. ideally we'd avoid this sort
  // of side-effecting behavior but there doesn't seem to be any other way
  // to capture this information to use in the repeatWhen call below. at
  // least all side effects are scoped to this function ...
  let lastState = null;
  let lastResult = null;

  // build up the state observable
  let seekState$ = state$.do((state) => {
    // capture the last state
    lastState = state;
  });
  if (select) {
    // use flatMap here to support async
    seekState$ = seekState$.flatMap(select);
  }
  return seekState$.do((result) => {
    // capture the last result
    lastResult = result;
  }).repeatWhen(completion$ => completion$
    // use flatMap here to support async, supply the repeat
    // function the last state and result values
    .flatMap(() => repeat(lastState, lastResult))

    // this causes the observable to complete if a nontruthy value
    // emitted, which ends the loop
    .takeWhile(value => value),
  );
}

/**
* Creates an observable of the children of a node. The filter and sort functions
* are passed the node names of the children, without the parent path.
* @param {ZookeeperClient} client - The node-zookeeper-client instance
* @param {Object} options
* @param {string} options.path - The path of the parent node
* @param {function} [options.filter] - The filter function
* @param {function} [options.sort] - The sort function
* @param {function} [options.watcher] - The watcher function
*/
export function observeNodeChildren(options) {
  console.log('a', options);

  const { client, path, filter = null, sort = null, watcher = null } = options;

  // construct the observable children
  let children$ = Observable.defer(async () => {
    await pify(client.mkdirp).call(client, path);
    const [children] = await pify(client.getChildren).call(client, path, watcher);
    return children;
  });

  // filter and sort if supplied
  if (filter) {
    children$ = children$.map(filter);
  }
  if (sort) {
    children$ = children$.map((children) => {
      console.log(11, children);
      return children.sort(sort);
    });
  }

  // return node names (not full paths!)
  return children$;
}

/**
* Creates an observable of a node
* @param {Object} options
* @param {boolean} [options.watch=false] - Whether to watch the node
* @param {function} options.accessor - The function that accepts the callback handler
* and returns a promise/observable for the value we care about.
*/
export function observeNodeValue(options) {
  const { watch = false, accessor } = options;
  return Observable.defer(() => {
    let value$ = Observable.of(null);
    let watcher = null;
    if (watch) {
      const emitter = new EventEmitter();
      value$ = value$.concat(Observable.fromEvent(emitter, changeEventName));
      watcher = () => emitter.emit(changeEventName);
    }
    return value$.flatMap(() => accessor(watcher));
  });
}

/**
* Creates an observable that emits true when a node has been removed
* @param {ZookeeperClient} client - The node-zookeeper-client instance
* @param {Object} options
* @param {string} options.path - The path of the node to remove
* @param {boolean} [options.watch=false] - Whether to watch the node
* and returns a promise/observable for the
*/
export function observeRemoveNode(options) {
  const { client, path, watch = false } = options;
  return observeNodeValue({
    client,
    path,
    watch,
    accessor: watcher => pify(client.exists).call(client, path, watcher),
  }).first(stat => !stat);
}

/**
* Creates an observable that emits true when a node has been removed
* @param {Object} options
* @param {ZookeeperClient} options.client - The node-zookeeper-client instance
* @param {string} options.path - The path of the node to remove
* @param {CreateMode} options.mode - Whether to watch the node
* and returns a promise/observable for the
*/
export function observeCreateNode(options) {
  const { client, path, mode } = options;
  return Observable.defer(() => pify(client.create).call(client, path, null, mode));
}

/**
* Generates a unique client id;
* @returns {string} - The unique client id
*/
export function generateClientId() {
  return uuid().replace(/-/g, '');
}

/**
* Generates the client node prefix for the given client id and (optional) prefix
* @param {string} clientId - The client id
* @param {string} [prefix] - The optional prefix
* @returns {string}
*/
export function getClientNodePrefix(clientId, prefix = null) {
  const components = [clientId, ''];
  if (prefix) {
    components.unshift(prefix);
  }
  return components.join('-');
}

/**
* Parses a client node name, i.e. the node name without the full path. Returns an
* object with a client property and, if appropriate, type and sequence properties.
* @param {string} node - The node name to parseClientNode
* @param {boolean} sequential - Whether this node has a sequence number appended
* @returns {Object}
*/
export function parseClientNode(node, sequential) {
  console.log(9, node, sequential);
  const parts = node.split('-');
  const preSequentialCount = sequential ? parts.length - 1 : parts.length;
  const result = {};
  switch (preSequentialCount) {
    case 1:
      result.client = parts[0];
      break;
    case 2:
      result.type = parts[0];
      result.client = parts[1];
      break;
    default:
      throw new InvalidStateError(`Invalid node ${node} with sequential=${sequential} in parseClientNode`);
  }
  if (sequential) {
    result.sequence = Number(parts[preSequentialCount]);
    if (isNaN(result.sequence)) {
      throw new InvalidStateError(`Invalid node ${node} with sequential=${sequential} in parseClientNode`);
    }
  }
  return result;
}

/**
* Sorting function for nodes, sorts in ascending order of sequence number. Only
* applicable to sequential nodes.
* @returns {number}
*/
export function sortClientNodesBySequence(node1, node2) {
  return parseClientNode(node1, true).sequence - parseClientNode(node2, true).sequence;
}

/**
* Creates an observable of client state using the given client factory. when
* subscribed to, this creates a new client and connects to it. The observable
* elements are { client, connected, readonly } and reflect the current state
* of the client. Throws an error in the event of a nonrecoverable state.
* @param {function} clientFactory - Function that returns a node-zookeeper-client instance
* @returns {Observable}
*/
export function observeClientState(clientFactory) {
  return Observable.create((observer) => {
    // create a new client
    const client = clientFactory();

    // subscribe to the state events and connect
    client.on('state', (state) => {
      switch (state.code) {
        case State.SYNC_CONNECTED.code:
          observer.next({ client, connected: true, readonly: false });
          break;
        case State.CONNECTED_READ_ONLY.code:
          observer.next({ client, connected: true, readonly: true });
          break;
        case State.DISCONNECTED.code:
          observer.next({ client, connected: false });
          break;
        case State.AUTH_FAILED.code:
          observer.error(new AuthenticationFailedError());
          break;
        case State.EXPIRED.code:
          observer.error(new ExpiredError());
          break;
        default:
          // do nothing on other cases: SASL_AUTHENTICATED
          break;
      }
    });

    // connect to the client
    client.connect();

    // return a teardown function`
    return () => {
      client.removeAllListeners();
      client.close();
    };
  });
}

/**
* Returns a function that returns true iff the first of the supplied nodes
* matches the prefix, i.e. if the client with that prefix is in the "leader"
* position.
* @param {string} clientNodePrefix - The client node prefix
* @returns {function}
*/
export function leaderSelector(clientNodePrefix) {
  return nodes => Observable.of(nodes.length && nodes[0].startsWith(clientNodePrefix));
}

/**
* Generates an observable for an exclusive lock on a resource.
* @param {Object} options
* @param {string} options.client - The node-zookeeper-client instance
* @param {string} options.path - The path on which to get the lock
* @param {string} options.clientNodePrefix - The client node prefix for this action
* @param {function} [options.select] - The client id for this action
* @param {function} options.repeat - The client id for this action
*/
export function observeSeekClientNodeState(options) {
  const { client, path, clientNodePrefix, select, repeat } = options;
  return observeSeekState({
    client,
    state$: observeNodeChildren({
      client,
      path,
      sort: sortClientNodesBySequence,
    }),
    select: select || leaderSelector(clientNodePrefix),
    repeat: (nodes, result) => {
      const clientIndex = nodes.findIndex(node => node.startsWith(clientNodePrefix));
      if (clientIndex < 0) {
        return observeCreateNode({
          client,
          path: [path, clientNodePrefix].join('/'),
          mode: CreateMode.EPHEMERAL_SEQUENTIAL,
        });
      }
      return repeat(nodes, result, clientIndex);
    },
  });
}