import EventEmitter from 'events';
import pify from 'pify';
import uuid from 'uuid';
import { Observable } from 'rxjs';
import { State, CreateMode } from 'node-zookeeper-client';
import { ExpiredError, AuthenticationFailedError, InvalidStateError } from './errors';
/**
* Constant for event name to avoid strings everywhere
*/
const changeEventName = 'change';
/**
* Creates an observable of delay values for retries given the provided retry options
* @param {Object} options
* @param {number} [options.initialDelay=500] - The initial retry delay
* @param {number} [options.delayFactor=2] - The factor by which to increase each delay
* @param {number} [options.maxDelay=8000] - The max delay value
* @param {number} [options.maxRetries=7] - The max number of retries
* @returns {Observable}
*/
export function observeDelay(options = {}) {
const { initialDelay = 500, delayFactor = 2, maxDelay = 8000, maxRetries = 7 } = options;
return Observable.generate(
[0, initialDelay],
([index]) => index < maxRetries,
([index, delay]) => [index + 1, Math.min(delay * delayFactor, maxDelay)],
([, delay]) => delay,
);
}
/**
* Makes an observable retryable with the provided delays.
* @param {Observable} obs$ - The input observable
* @param {Observable} delay$ - The delays to use
* @param {function} [retryPredicate] - Determines if the retry should occur
* based on the supplied error object. If not supplied, all errors are retried.
* @returns {Observable}
*/
export function makeRetryable(obs$, delay$, retryPredicate = null) {
// retry on error as long as retryPredicate returns true and we have retries left.
// tack one extra null onto the delay observable; when we encounter that, we know
// we're out of retries and so should throw the error
return obs$.retryWhen(error$ =>
Observable.zip(error$, delay$.concat(Observable.of(null)))
.flatMap(([error, delay]) => {
if ((delay !== null) && (!retryPredicate || retryPredicate(error))) {
return Observable.of(null).delay(delay);
}
return Observable.throw(error);
}),
);
}
/**
* A generic state seeking algorithm that takes the supplied observable
* and returns its state, optionally transformed, as the output. This repeats
* indefinitely after the wait observable completes on each cycle, so that observable
* should emit when the state should be reevaluated and issued by the main observable.
* @param {Object} options
* @param {Observable} options.state$
* @param {function} options.repeat
* @param {function} [options.select]
*/
export function observeSeekState(options) {
const { state$, select, repeat } = options;
// track these values to use when repeating. ideally we'd avoid this sort
// of side-effecting behavior but there doesn't seem to be any other way
// to capture this information to use in the repeatWhen call below. at
// least all side effects are scoped to this function ...
let lastState = null;
let lastResult = null;
// build up the state observable
let seekState$ = state$.do((state) => {
// capture the last state
lastState = state;
});
if (select) {
// use flatMap here to support async
seekState$ = seekState$.flatMap(select);
}
return seekState$.do((result) => {
// capture the last result
lastResult = result;
}).repeatWhen(completion$ => completion$
// use flatMap here to support async, supply the repeat
// function the last state and result values
.flatMap(() => repeat(lastState, lastResult))
// this causes the observable to complete if a nontruthy value
// emitted, which ends the loop
.takeWhile(value => value),
);
}
/**
* Creates an observable of the children of a node. The filter and sort functions
* are passed the node names of the children, without the parent path.
* @param {ZookeeperClient} client - The node-zookeeper-client instance
* @param {Object} options
* @param {string} options.path - The path of the parent node
* @param {function} [options.filter] - The filter function
* @param {function} [options.sort] - The sort function
* @param {function} [options.watcher] - The watcher function
*/
export function observeNodeChildren(options) {
console.log('a', options);
const { client, path, filter = null, sort = null, watcher = null } = options;
// construct the observable children
let children$ = Observable.defer(async () => {
await pify(client.mkdirp).call(client, path);
const [children] = await pify(client.getChildren).call(client, path, watcher);
return children;
});
// filter and sort if supplied
if (filter) {
children$ = children$.map(filter);
}
if (sort) {
children$ = children$.map((children) => {
console.log(11, children);
return children.sort(sort);
});
}
// return node names (not full paths!)
return children$;
}
/**
* Creates an observable of a node
* @param {Object} options
* @param {boolean} [options.watch=false] - Whether to watch the node
* @param {function} options.accessor - The function that accepts the callback handler
* and returns a promise/observable for the value we care about.
*/
export function observeNodeValue(options) {
const { watch = false, accessor } = options;
return Observable.defer(() => {
let value$ = Observable.of(null);
let watcher = null;
if (watch) {
const emitter = new EventEmitter();
value$ = value$.concat(Observable.fromEvent(emitter, changeEventName));
watcher = () => emitter.emit(changeEventName);
}
return value$.flatMap(() => accessor(watcher));
});
}
/**
* Creates an observable that emits true when a node has been removed
* @param {ZookeeperClient} client - The node-zookeeper-client instance
* @param {Object} options
* @param {string} options.path - The path of the node to remove
* @param {boolean} [options.watch=false] - Whether to watch the node
* and returns a promise/observable for the
*/
export function observeRemoveNode(options) {
const { client, path, watch = false } = options;
return observeNodeValue({
client,
path,
watch,
accessor: watcher => pify(client.exists).call(client, path, watcher),
}).first(stat => !stat);
}
/**
* Creates an observable that emits true when a node has been removed
* @param {Object} options
* @param {ZookeeperClient} options.client - The node-zookeeper-client instance
* @param {string} options.path - The path of the node to remove
* @param {CreateMode} options.mode - Whether to watch the node
* and returns a promise/observable for the
*/
export function observeCreateNode(options) {
const { client, path, mode } = options;
return Observable.defer(() => pify(client.create).call(client, path, null, mode));
}
/**
* Generates a unique client id;
* @returns {string} - The unique client id
*/
export function generateClientId() {
return uuid().replace(/-/g, '');
}
/**
* Generates the client node prefix for the given client id and (optional) prefix
* @param {string} clientId - The client id
* @param {string} [prefix] - The optional prefix
* @returns {string}
*/
export function getClientNodePrefix(clientId, prefix = null) {
const components = [clientId, ''];
if (prefix) {
components.unshift(prefix);
}
return components.join('-');
}
/**
* Parses a client node name, i.e. the node name without the full path. Returns an
* object with a client property and, if appropriate, type and sequence properties.
* @param {string} node - The node name to parseClientNode
* @param {boolean} sequential - Whether this node has a sequence number appended
* @returns {Object}
*/
export function parseClientNode(node, sequential) {
console.log(9, node, sequential);
const parts = node.split('-');
const preSequentialCount = sequential ? parts.length - 1 : parts.length;
const result = {};
switch (preSequentialCount) {
case 1:
result.client = parts[0];
break;
case 2:
result.type = parts[0];
result.client = parts[1];
break;
default:
throw new InvalidStateError(`Invalid node ${node} with sequential=${sequential} in parseClientNode`);
}
if (sequential) {
result.sequence = Number(parts[preSequentialCount]);
if (isNaN(result.sequence)) {
throw new InvalidStateError(`Invalid node ${node} with sequential=${sequential} in parseClientNode`);
}
}
return result;
}
/**
* Sorting function for nodes, sorts in ascending order of sequence number. Only
* applicable to sequential nodes.
* @returns {number}
*/
export function sortClientNodesBySequence(node1, node2) {
return parseClientNode(node1, true).sequence - parseClientNode(node2, true).sequence;
}
/**
* Creates an observable of client state using the given client factory. when
* subscribed to, this creates a new client and connects to it. The observable
* elements are { client, connected, readonly } and reflect the current state
* of the client. Throws an error in the event of a nonrecoverable state.
* @param {function} clientFactory - Function that returns a node-zookeeper-client instance
* @returns {Observable}
*/
export function observeClientState(clientFactory) {
return Observable.create((observer) => {
// create a new client
const client = clientFactory();
// subscribe to the state events and connect
client.on('state', (state) => {
switch (state.code) {
case State.SYNC_CONNECTED.code:
observer.next({ client, connected: true, readonly: false });
break;
case State.CONNECTED_READ_ONLY.code:
observer.next({ client, connected: true, readonly: true });
break;
case State.DISCONNECTED.code:
observer.next({ client, connected: false });
break;
case State.AUTH_FAILED.code:
observer.error(new AuthenticationFailedError());
break;
case State.EXPIRED.code:
observer.error(new ExpiredError());
break;
default:
// do nothing on other cases: SASL_AUTHENTICATED
break;
}
});
// connect to the client
client.connect();
// return a teardown function`
return () => {
client.removeAllListeners();
client.close();
};
});
}
/**
* Returns a function that returns true iff the first of the supplied nodes
* matches the prefix, i.e. if the client with that prefix is in the "leader"
* position.
* @param {string} clientNodePrefix - The client node prefix
* @returns {function}
*/
export function leaderSelector(clientNodePrefix) {
return nodes => Observable.of(nodes.length && nodes[0].startsWith(clientNodePrefix));
}
/**
* Generates an observable for an exclusive lock on a resource.
* @param {Object} options
* @param {string} options.client - The node-zookeeper-client instance
* @param {string} options.path - The path on which to get the lock
* @param {string} options.clientNodePrefix - The client node prefix for this action
* @param {function} [options.select] - The client id for this action
* @param {function} options.repeat - The client id for this action
*/
export function observeSeekClientNodeState(options) {
const { client, path, clientNodePrefix, select, repeat } = options;
return observeSeekState({
client,
state$: observeNodeChildren({
client,
path,
sort: sortClientNodesBySequence,
}),
select: select || leaderSelector(clientNodePrefix),
repeat: (nodes, result) => {
const clientIndex = nodes.findIndex(node => node.startsWith(clientNodePrefix));
if (clientIndex < 0) {
return observeCreateNode({
client,
path: [path, clientNodePrefix].join('/'),
mode: CreateMode.EPHEMERAL_SEQUENTIAL,
});
}
return repeat(nodes, result, clientIndex);
},
});
}