privateSubscription.js

/*
 * Copyright ConsenSys Software Inc.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. 
 * If a copy of the MPL was not distributed with this file, You can obtain one at 
 *
 * http://mozilla.org/MPL/2.0/
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: MPL-2.0
 */

const EventEmitter = require("events");

const Protocol = {
  HTTP: "HTTP",
  WEBSOCKET: "WebSocket"
};

const Event = {
  CONNECTED: "connected",
  DATA: "data",
  ERROR: "error"
};

/**
 * Manage a specific type of subscription
 * @param {PrivateSubscription} subscription
 */
function SubscriptionManager(subscription) {
  this.subscription = subscription;
  this.web3 = subscription.web3;

  return this;
}

/**
 * Manage creating/destroying filter and polling for new logs
 * using `priv_getFilterChanges`
 * @param {PrivateSubscription} subscription
 */
function PollingSubscription(subscription, pollingInterval) {
  SubscriptionManager.call(this, subscription);

  this.privacyGroupId = subscription.privacyGroupId;
  this.filter = subscription.filter;
  this.timeout = null;
  // How frequently to poll for new logs, in milliseconds
  this.pollingInterval = pollingInterval || 1000;

  return this;
}
PollingSubscription.prototype = Object.create(SubscriptionManager.prototype);
PollingSubscription.prototype.constructor = PollingSubscription;

PollingSubscription.prototype.subscribe = async function subscribe(
  privacyGroupId,
  filter,
  blockId
) {
  // install filter
  this.subscription.filterId = await this.web3.priv.createFilter(
    privacyGroupId,
    filter,
    blockId
  );

  // wait for new logs
  await this.pollForLogs(privacyGroupId, this.subscription.filterId);
};

PollingSubscription.prototype.getPastLogs = async function getPastLogs(
  privacyGroupId,
  filterId
) {
  return this.web3.priv.getFilterLogs(privacyGroupId, filterId);
};

PollingSubscription.prototype.pollForLogs = async function pollForLogs(
  privacyGroupId,
  filterId
) {
  const fetchLogs = async () => {
    try {
      const logs = await this.web3.priv.getFilterChanges(
        privacyGroupId,
        filterId
      );
      logs.forEach(log => {
        this.subscription.emit("data", log);
      });
      // continue
      this.timeout = setTimeout(() => {
        this.pollForLogs(privacyGroupId, filterId);
      }, this.pollingInterval);
    } catch (error) {
      this.subscription.emit("error", error);
    }
  };

  fetchLogs();
};

PollingSubscription.prototype.unsubscribe = async function unsubscribe(
  privacyGroupId,
  filterId,
  callback
) {
  return this.web3.priv
    .uninstallFilter(privacyGroupId, filterId)
    .then(() => {
      if (this.timeout != null) {
        clearTimeout(this.timeout);
      }
      this.subscription.reset();

      if (callback != null) {
        callback(null, true);
      }
      return filterId;
    })
    .catch(error => {
      if (callback != null) {
        callback(error);
      }
      return error;
    });
};

/**
 * Manage persistent pub-sub subscriptions over WebSocket
 * @param {PrivateSubscription} subscription
 */
function PubSubSubscription(subscription) {
  SubscriptionManager.call(this, subscription);
  return this;
}
PubSubSubscription.prototype = Object.create(SubscriptionManager.prototype);
PubSubSubscription.prototype.constructor = PubSubSubscription;

PubSubSubscription.prototype.subscribe = async function subscribe(
  privacyGroupId,
  filter
) {
  const websocketProvider = this.web3.currentProvider;
  // Register provider events to forward to the caller
  websocketProvider.on("connect", () => {
    console.log("CONNECTED");
    this.subscription.emit(Event.CONNECTED);
  });
  websocketProvider.on("data", data => {
    // Log is in `params` key of JSON-RPC response
    this.subscription.emit(Event.DATA, data.params);
  });
  websocketProvider.on("error", e => {
    this.subscription.emit(Event.ERROR, e);
  });

  // start subscription
  this.subscription.filterId = await this.web3.privInternal.subscribe(
    privacyGroupId,
    "logs",
    filter
  );
};

PubSubSubscription.prototype.getPastLogs = async function getPastLogs() {
  // noop - subscriptions don't get past logs
  return Promise.resolve([]);
};

PubSubSubscription.prototype.unsubscribe = async function unsubscribe(
  privacyGroupId,
  filterId,
  callback
) {
  return this.web3.privInternal
    .unsubscribe(privacyGroupId, filterId)
    .then(result => {
      this.subscription.reset();

      callback(null, result);
      return result;
    })
    .catch(error => {
      if (callback != null) {
        callback(error);
      }
      return error;
    });
};

/**
 * Controls the lifecycle of a private subscription
 * @param {*} web3
 * @param {*} privacyGroupId
 * @param {*} filter
 */
function PrivateSubscription(web3, privacyGroupId, filter) {
  this.privacyGroupId = privacyGroupId;
  this.filter = filter;

  this.web3 = web3;
  this.filterId = null;

  this.getPast = false;

  const providerType = web3.currentProvider.constructor.name;
  if (providerType === "HttpProvider") {
    this.protocol = Protocol.HTTP;
    this.manager = new PollingSubscription(
      this,
      this.web3.priv.subscriptionPollingInterval
    );
    // TODO: handle WebSockets if the node doesn't support priv_subscribe
  } else if (providerType === "WebsocketProvider") {
    this.protocol = Protocol.WEBSOCKET;
    this.manager = new PubSubSubscription(this);
  } else {
    throw new Error(
      "Current protocol does not support subscriptions. Use HTTP or WebSockets."
    );
  }

  return this;
}

// get functions from EventEmitter
PrivateSubscription.prototype = Object.create(EventEmitter.prototype);
PrivateSubscription.prototype.constructor = PrivateSubscription;

PrivateSubscription.prototype.subscribe = async function subscribe() {
  // If `fromBlock` is set, get previous logs when the user adds
  // a callback for the "data" event.
  if (this.filter.fromBlock != null) {
    this.getPast = true;
  }

  // Sets this.filterId
  await this.manager.subscribe(this.privacyGroupId, this.filter, this.blockId);
  if (this.filterId == null) {
    throw new Error("Failed to set filter ID");
  }

  return this.filterId;
};

PrivateSubscription.prototype.on = function on(eventName, callback) {
  // Register the callback
  EventEmitter.prototype.on.call(this, eventName, callback);

  // Get past logs if necessary once the user has added a callback
  if (this.getPast && eventName === "data") {
    // Execute asynchronously so we can return immediately
    // eslint-disable-next-line promise/catch-or-return
    this.manager
      .getPastLogs(this.privacyGroupId, this.filterId)
      .then(pastLogs => {
        pastLogs.forEach(log => {
          this.emit("data", log);
        });
        return pastLogs;
      });
  }

  return this;
};

PrivateSubscription.prototype.reset = function reset() {
  this.removeAllListeners();
};

PrivateSubscription.prototype.unsubscribe = async function unsubscribe(
  callback
) {
  return this.manager.unsubscribe(this.privacyGroupId, this.filterId, callback);
};

module.exports = {
  PrivateSubscription
};