Files
palemoon27/dom/push/PushServiceWebSocket.jsm
roytam1 116abd772d import changes from `dev' branch of rmottola/Arctic-Fox:
- Bug 1263951 - Avoid trying to initialize new GMP instances once the browser enters shutdown. r=cpearce (3ca153c7f4)
- Bug 1268714 - Check for failure result or a failed nsresult from SendLoadGMP. r=mccr8 (928546a72a)
- remove redundant decl (0c7c81e384)
- Bug 1161339 - Add gtest calling rust code. r=cajbir (9c0a4982d7)
- Bug 1269249: [MSE] P1. Clamp range to media source duration when media source is ended. r=jwwang (dfc42686b5)
- Bug 1269178: P4. Add mochitest. r=gerald (37fe5f9232)
- Bug 1269249: [MSE] P2. Add mochitest verifying behavior. r=jwwang (f50fb0d648)
- Bug 1245052 - various media b2g build errors r=jya (1fc0f3b8f1)
- Bug 1239598 - Fix potential deadlock and race condition r=bechen (dfebc1b9c5)
- Bug 1185931 - Add assert(mDecoderStateMachine). r=jwwang (16a79dd863)
- bits of Bug 1160695 (a595535a04)
- Bug 1205209 - Check whether mStreamSource is null in MediaOmxReader. r=bechen (3e380b282d)
- Bug 1210286 - Fall back to converting SourceSurfaces (RGB) to NV12 in OMXCodecWrapper. r=jolin (b9e26a43ee)
- Bug 1090015 - Suppress multichar warnings in the OMX code. r=kinetik (13cea78721)
- missing bit of Bug 1137151: Remove ref-counting from |OMXVideoEncoder| (0abf3cccf3)
- Bug 1239610 - Remove GonkNativeWindowClient usage from OmxDecoder r=jolin (a1ccc2a40e)
- missing bit of 1198576 (c3284a3002)
- Bug 1267637: P1. Consider invalid an AudioData with more than 8 audio channels. r=gerald (9bacf3fa8d)
- Bug 1267637: P2. Ignore outright audio track considered invalid. r=gerald (d34b468c87)
- Bug 1267637: [opus] P3. Reject audio data with unsupported audio configuration. r=gerald (90be7f8e3a)
- Bug 1267637: [vorbis] P4. Reject audio data with unsupported audio configuration. r=gerald (2321df4669)
- Bug 1267637: [AT] P6. Reject audio data with unsupported channel configuration. r=gerald (48756a764b)
- Bug 1267637: [ffmpeg] P7. Reject audio data with unsupported channel configuration. r=gerald (bbf90018b5)
- Bug 1267637: [gonk] P8. Reject audio data with unsupported channel configuration. r=gerald (44043594f0)
- Bug 1199809 - Remove all references to unused task queue. r=jya (829bb54ce7)
- Bug 1199809 - Don't schedule decoder I/O task when there will be more input. r=bwu (284c8b28d4)
- Bug 1215441 - Skip flush before Init() is completed. r=sotaro (f59f4ae450)
- Bug 1207214 - Assert decoder attaches EOS flag to final output buffer. r=sotaro (da0fc1f41b)
- Bug 1217220 - use output timestamp to decide which item needs to be removed from waiting list. r=jya (b056c21000)
- Bug 1222919 - Make ProcessFlush() virtual. r=jya (bea87e8e8a)
- Bug 1259366 - Flush after eos of android::MediaCodec r=jolin (c8e1e038ad)
- Bug 1199809 - Reset last decoded frame time on looper thread to avoid race condition. r=jya (1d56c95439)
- cleanup (83701e29ea)
- Bug 1267637: [wmf] P9. Reject audio data with unsupported channel configuration. r=gerald (c1f32cf152)
- Bug 1264925: Force D3D9 when attempting to decode VP8 or VP9. r=mattwoodrow (2a7d853fe6)
- Bug 1162899 - Use sync message for in-process mozHasPendingMessage. r=fabrice (d28430d0b2)
- Bug 1235484 - Part 1: Refine radio state check in MmsService. r=bevistseng (5f00e3ec79)
- remove android, fix some missing tests (01b371eb39)
- Bug 1259148 - Notify content when the notification permission pop-up is dismissed by the user. r=past,wchen (ca2dfbf92f)
- Bug 1267357 Cycle collect NotificationPermissionRequest::mCallback. r=mccr8 (6d0086af08)
- Bug 1265828 - Remove persistent notifications from storage. r=wchen (230d6f8458)
- Bug 1254816 - Use IgnoredErrorResult instead of ErrorResult for rv in nsDOMOfflineResourceList::Length(). r=bz (581f5c69db)
- Bug 1140478: Free the string returned by PrintJSStack(), in android shutdown logging function. r=jorendorff (ebf6ef80d1)
- Bug 1191137 - fix Mulet and responsive design mode to send key events properly r=ochameau,jryans (d47db5bf2b)
- Bug 1267096 - Return early if we have no global when creating a Promise. r=smaug (d4075f38fe)
- Bug 1146418 - Promise API entry points should use NS_ASSERT_OWNINGTHREAD, r=baku (a43b49c451)
- Bug 1262069: Wrap promise resolution values before storing. r=bz (66ebb63ce1)
- Bug 1246073 - Fix unique constraint errors in the H2 backend when resubscribing. r=dragana (43c67dc6bc)
- Bug 1266433 - Clean up nsIPushNotifier static casts. r=dragana (eef5497c75)
- Bug 1266433 - Update the comments in the Push XPIDL interfaces. r=me (35cd32c385)
- Bug 1266433 - Indicate push subscriptions created by privileged code. r=dragana (824381dd69)
- Bug 1242436 - default value of ok = true in order to check the return of SendPush and SendPushSubscriptionChange. r=kitcambridge (da55effde0)
- Bug 1267889 - Always steal the error result in PushMessage::Json. r=dragana (a66bf171b6)
- Bug 1243778 - PushRecord::getLastVisit cannot rely on the Places url index anymore. r=kitcambridge (07c3bdc4db)
- Bug 1260499 - Handle incoming messages before push service is initialized. r=nalexander,jchen (d60ccda56b)
- Bug 1265915 - Remove adaptive pings from the Push WebSocket backend. r=dragana (c8de7d5dd3)
- Bug 1262559: Fix misspelled comment in dom/push/PushServiceWebSocket.jsm; r=jdm (a9d869773d)
- Bug 1265914 - Remove Push UDP wake-up. r=dragana (128cf912bb)
- Bug 1261634 - Update whitespace skipping for meta csp. r=dveditz (c6abb5c502)
- Bug 1227813 - CSP: Ignore unsafe-inline within style-src if hash or nonce specified. r=kmckinley (ab1db6e779)
- Bug 1192840 - Fix CSP report content-type. r=ckerschb (81c55b28f0)
- Bug 1262635 - Don't strip URIs of ftp: when sending reports. r=dveditz (77e9aacac1)
- Bug 1216365 - nsMixedContentBlocker should use innerMostURI for aContentLocation. r=tanvi (9134c28268)
- Bug 1260153 - remove unreachable code in nsMixedContentBlocker. if/else blocks above all return. r=ckerschb (da297bf7ff)
- Bug 1202374 - "Can't open apps any more". r=lissyx+mozillians (ed090dbfce)
- Bug 1216498 - Bump SettinsDB version in order to enable pin the web. r=mhenretty (e2d0e9986f)
- Bug 1226906 - Bump Settings BD version. r=gwagner (e063a2d482)
- Bug 1119727 - Make Settings Soft Lockup threshold configurable. r=gwagner (1f5d9d3a50)
- var-let (48c1155a0e)
- bit of Backed out 2 changesets (bug 1202902) (828b6981a8)
- Bug 1228673 - Clean-up 'Then' before ~MediaTimer. r=jya (e51663a94b)
- align tests (0fbfa6f14b)
- Bug 1152236: OMX codec should use AnnexB as input format. r=jya (fc7ed581ac)
- Bug 1153895 - Support audio AMR-WB for Gonk in MP4Reader. r=jya (d9a530979e)
- Bug 1174623 - Support mp3 in Gonk PDM. r=sotaro (1f6efd4397)
- Bug 1174166 - Support H.263 in Gonk PDM. r=sotaro (5634e66ce3)
- Bug 1251155 - Remove GLContext from VideoDataDecoder r=snorp (814629fcd9)
- Bug 1262456 - [2.1] Replace queue adapter with deque. r=snorp (80587b789d)
- Bug 1262456 - [1.1] Prevent interruption of the decoder shutdown procedure and early shutdown return. r=snorp (98b97f0b48)
- Bug 1248792 - [1.2] Replace MediaRawData raw pointers with RefPtr. r=snorp (c1c62c4dc6)
- Bug 1226730 - [1.1] Provide sample rate instead of bit depth in audio format creation. r=snorp (86e50d757f)
- Bug 1244292 - [1.2] Release decoder on init failure. r=snorp (48b18cf8dd)
- Bug 1267637: [android] P5. Reject audio data with unsupported audio configuration. r=gerald (cf74f43c71)
- Bug 1230784 - Don't copy SurfaceTexture contents when presenting video on Android r=esawin,jya (fbd0ffd1ea)
- Bug 1230768. r=jesup (1619923f64)
- Bug 1205164 - Make sure ShmemPool high water mark is logged in all allocators. r=jesup (1fa4710751)
- Bug 1205164 - Fix ShmemPool detection of failed allocations. r=jesup (070327ef38)
- Bug 1247933 - do not perform null check on aClient since we know for sure it's a valid pointert. r=sotaro (1ac12b4091)
- Bug 1200903 - Fix MediaSystemResourceService::RemoveRequests() r=cpearce (13676b47c7)
- Bug 1250083 - make sure value attributed to usPerDataChunk is floating point value. r=cpearce (af9c3bd65d)
- Bug 1250497: Initalised Values used in WaveDemuxer.cpp. r=cpearce (36a24774ed)
- Bug 1250293 - Fixed Coverity warning in WaveDemuxer.cpp. r=cpearce (f5a690fac4)
- bug 1220042 make AlignedTArray base class inheritance private r=jwwang (3dd9efaa68)
- Bug 877662 - Add AlignmentUtils.h r=padenot (82b7563dd7)
- bug 930257 schedule Analyser inactive check when sending last null chunk r=padenot (61e9b5cf0f)
- bug 1197028 add MOZ_IMPLICIT for AudioBlock constructor from base AudioChunk on CLOSED TREE (0e2a86c6b5)
- bug 1203380 add custom AudioBlock copy constructor and make AudioChunk conversion constructor explicit r=padenot (56f2e37d5b)
- bug 1205540 initialize mBufferFormat when constructing silent block r=padenot (078feb4af4)
- bug 1203380 ClearDownstreamMark() before returning AsMutableChunk() r=padenot (56ea1f589f)
- bug 1203380 tighten not-sharing assertion in ChannelFloatsForWrite() r=padenot (368e354a06)
- bug 1203380 add custom assignment operator to AudioBlock r=padenot (31f9f914d1)
- Bug 877662 - Align audio buffer allocations to 16 byte boundaries r=padenot (5f33e9dd0e)
- Bug 1173016 - Bustage fix: mark BasicWaveformCache's ctor as `explicit`, on a CLOSED TREE. (bb28bb93fc)
- Bug 1265397 - Add a length attribute to OfflineAudioContext. r=smaug (3c2cf2aee6)
- bug 1255618 remove AudioContext from global window at unlink r=Ehsan (5332c8b42b)
- bug 1222202 implement query interface to nsIMemoryReporter r=bz (e6662d2ba8)
- Bug 1259831 - Remove the auto-suspend logic for AudioContext. r=karlt (3376fb3209)
- Bug 1232326 - Uninitialised value use in AudioBufferInPlaceScale. r=dminor. (548cbbfa52)
- Bug 1240054 - Only rebuild BandLimitedTables if more partials are required r=padenot (39bf05e142)
- Bug 1216081 - OscillatorNodeEngine::mFinalFrequency is used uninitialised. r=padenot. (5989729ec6)
- Bug 1265405 - Use a dictionary to specify how PeriodicWave should be normalized (or not); r=padenot (f43e3f17ba)
- Bug 1267096 - Check the return value of Promise::Create in AudioContext::StartRendering. r=smaug (5efca6cfe8)
- Bug 1267579 - Unexpected result when using OscillatorNode with custom wave shape; r=padenot (ed62d1b496)
- Bug 1209904 - Optimize OscillatorNode when its frequency is not changin and it's using ::ComputeCustom. r=karlt (9a2246cc3f)
- Bug 877662 - Use SSE2 versions of AudioNodeEngine functions r=padenot (1efa0b2cf3)
- Bug 877662 - Add an SSE2 implementation of AudioNodeEngine.cpp functions. r=ehsan (ce5ff146e5)
- Bug 877662 - Update SSE2 versions of AudioNodeEngine functions r=padenot (ded51f436e)
- Bug 1266405 - AudioBufferSourceNode::CopyFromBuffer should not borrow unaligned buffers; r=padenot (a2880e5c97)
- bug 1227411 add some initial logging of AudioNode API use r=padenot (6d0febaf34)
- bug 1199561 delay offline buffer allocation until non-null input is received r=padenot (38d56f3e89)
- Bug 1110344 - Replace float by double in AudioTimelineEvent ctor to prevent a rounding issue. r=padenot (c358371cfa)
- Bug 1231124 - addded mCurve to constructor. r=smaug (572fed89d6)
- Bug 1232646 - initialize 3 variables: mCurve, mTimeConstant, mDuration. r=cpearce (e36b3dbb71)
- Bug 1069825 - Check if we compare two automation curves occuring at the same time during overlap checking. r=padenot (35624be622)
- bug 1227411 add WEB_AUDIO_API_LOG r=padenot (89de67e91b)
- bug 1189168 avoid main thread assertion accessing mNode in SizeOfIncludingThis() r=padenot (370df2ff5b)
- Bug 1266047 - Fix crash in mozilla::AudioBufferAddWithScale_SSE r=padenot (388de2edf6)
- Bug 1266772 - Unbreak FreeBSD build after bug 881587. r=dminor (e194878792)
- Bug 1266112 - Remove unnecessary alignment checks from AudioNodeEngine.cpp; r=padenot (284ac98016)
- Bug 881587 - Add SSE2 version of AudioNodeEngine.cpp routines added in bug 815643. r=tterribe (5532515d07)
- Bug 881587 - Use SSE2 version of AudioNodeEngine.cpp routines added in bug 815643. r=padenot (82100493a4)
- Bug 1105513 - Add a NEON version for AudioBlockPanStereoToStereo when aIsOnTheLeft is an array r=padenot (b102beb60d)
- Bug 1203836 - Properly handle silent chunks in AudioNodeExternalInputStream. r=karlt (0597e5c122)
- Bug 1265131, part 1 - update moz.build for Skia m51. r=jrmuizel (745537cf9b)
- Bug 1265131, part 2 - update SkiaGLGlue for Skia m51. r=jrmuizel (71a3ffc91e)
- Bug 1265131, part 3 - update Moz2d for Skia m51. r=jrmuizel (2129a455cb)
- Bug 1262745 - Fix tests for Canvas CSS/SVG Filters. r=mstange (93d3652ac0)
- Bug 1265131, part 4 - fix tests for Skia m51 update. r=jrmuizel (964ea5c037)
- Bug 1265131, part 5 - update Skia to m51 branch. r=jrmuizel (42da76e40e)
- Bug 1268816 - allow Skia to use C++11 features on platforms that have them. r=froydnj (ff7d9e46b6)
- Bug 1268816 - follow-up to fix #ifdef -> #if. r=me (b81d86c173)
- Bug 1267180. Don't draw emojis as paths when they are too big. r=lsalzman (61c3bd732c)
- Bug 1269247 - check that SkPaint has a typeface before using it. r=mchang (cf873c19b0)
- bug 1249738 - make sScreenConfigurationObservers a function static r=dhylands (ab698385c4)
- bug 1249738 - make sBatteryObservers a function static r=dhylands (eb205b1b64)
- bits of Bug 1265131, part 5 (1340103927)
- Bug 1248224 - backport of Skia GrPathUtils::QuadUVMatrix assertion fix. r=jrmuizel (bcf81bf241)
- bug 1249738 - make sNetworkObservers a function static r=dhylands (a5d03d4425)
2024-09-18 09:50:59 +08:00

1120 lines
33 KiB
JavaScript

/* jshint moz: true, esnext: true */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
const Cc = Components.classes;
const Ci = Components.interfaces;
const Cu = Components.utils;
const Cr = Components.results;
Cu.import("resource://gre/modules/AppConstants.jsm");
Cu.import("resource://gre/modules/Preferences.jsm");
Cu.import("resource://gre/modules/Promise.jsm");
Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://gre/modules/Timer.jsm");
Cu.import("resource://gre/modules/XPCOMUtils.jsm");
const {PushDB} = Cu.import("resource://gre/modules/PushDB.jsm");
const {PushRecord} = Cu.import("resource://gre/modules/PushRecord.jsm");
const {
PushCrypto,
getCryptoParams,
} = Cu.import("resource://gre/modules/PushCrypto.jsm");
if (AppConstants.MOZ_B2G) {
XPCOMUtils.defineLazyServiceGetter(this, "gPowerManagerService",
"@mozilla.org/power/powermanagerservice;1",
"nsIPowerManagerService");
}
const kPUSHWSDB_DB_NAME = "pushapi";
const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
const kPUSHWSDB_STORE_NAME = "pushapi";
// WebSocket close code sent by the server to indicate that the client should
// not automatically reconnect.
const kBACKOFF_WS_STATUS_CODE = 4774;
// Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
// included in request payloads.
const kACK_STATUS_TO_CODE = {
[Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
[Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
[Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
};
const kUNREGISTER_REASON_TO_CODE = {
[Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
[Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
[Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
};
const kDELIVERY_REASON_TO_CODE = {
[Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
[Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
[Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
};
const prefs = new Preferences("dom.push.");
this.EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
XPCOMUtils.defineLazyGetter(this, "console", () => {
let {ConsoleAPI} = Cu.import("resource://gre/modules/Console.jsm", {});
return new ConsoleAPI({
maxLogLevelPref: "dom.push.loglevel",
prefix: "PushServiceWebSocket",
});
});
/**
* A proxy between the PushService and the WebSocket. The listener is used so
* that the PushService can silence messages from the WebSocket by setting
* PushWebSocketListener._pushService to null. This is required because
* a WebSocket can continue to send messages or errors after it has been
* closed but the PushService may not be interested in these. It's easier to
* stop listening than to have checks at specific points.
*/
var PushWebSocketListener = function(pushService) {
this._pushService = pushService;
};
PushWebSocketListener.prototype = {
onStart: function(context) {
if (!this._pushService) {
return;
}
this._pushService._wsOnStart(context);
},
onStop: function(context, statusCode) {
if (!this._pushService) {
return;
}
this._pushService._wsOnStop(context, statusCode);
},
onAcknowledge: function(context, size) {
// EMPTY
},
onBinaryMessageAvailable: function(context, message) {
// EMPTY
},
onMessageAvailable: function(context, message) {
if (!this._pushService) {
return;
}
this._pushService._wsOnMessageAvailable(context, message);
},
onServerClose: function(context, aStatusCode, aReason) {
if (!this._pushService) {
return;
}
this._pushService._wsOnServerClose(context, aStatusCode, aReason);
}
};
// websocket states
// websocket is off
const STATE_SHUT_DOWN = 0;
// Websocket has been opened on client side, waiting for successful open.
// (_wsOnStart)
const STATE_WAITING_FOR_WS_START = 1;
// Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
const STATE_WAITING_FOR_HELLO = 2;
// Websocket operational, handshake completed, begin protocol messaging.
const STATE_READY = 3;
this.PushServiceWebSocket = {
_mainPushService: null,
_serverURI: null,
newPushDB: function() {
return new PushDB(kPUSHWSDB_DB_NAME,
kPUSHWSDB_DB_VERSION,
kPUSHWSDB_STORE_NAME,
"channelID",
PushRecordWebSocket);
},
serviceType: function() {
return "WebSocket";
},
disconnect: function() {
this._shutdownWS();
},
observe: function(aSubject, aTopic, aData) {
if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
this._onUAIDChanged();
} else if (aTopic == "timer-callback") {
this._onTimerFired(aSubject);
}
},
/**
* Handles a UAID change. Unlike reconnects, we cancel all pending requests
* after disconnecting. Existing subscriptions stored in IndexedDB will be
* dropped on reconnect.
*/
_onUAIDChanged() {
console.debug("onUAIDChanged()");
this._shutdownWS();
this._startBackoffTimer();
},
/** Handles a ping, backoff, or request timeout timer event. */
_onTimerFired(timer) {
console.debug("onTimerFired()");
if (timer == this._pingTimer) {
this._sendPing();
return;
}
if (timer == this._backoffTimer) {
console.debug("onTimerFired: Reconnecting after backoff");
this._beginWSSetup();
return;
}
if (timer == this._requestTimeoutTimer) {
this._timeOutRequests();
return;
}
},
/**
* Sends a ping to the server. Bypasses the request queue, but starts the
* request timeout timer. If the socket is already closed, or the server
* does not respond within the timeout, the client will reconnect.
*/
_sendPing() {
console.debug("sendPing()");
this._startRequestTimeoutTimer();
try {
this._wsSendMessage({});
this._lastPingTime = Date.now();
} catch (e) {
console.debug("sendPing: Error sending ping", e);
this._reconnect();
}
},
/** Times out any pending requests. */
_timeOutRequests() {
console.debug("timeOutRequests()");
if (!this._hasPendingRequests()) {
// Cancel the repeating timer and exit early if we aren't waiting for
// pongs or requests.
this._requestTimeoutTimer.cancel();
return;
}
let now = Date.now();
// Set to true if at least one request timed out, or we're still waiting
// for a pong after the request timeout.
let requestTimedOut = false;
if (this._lastPingTime > 0 &&
now - this._lastPingTime > this._requestTimeout) {
console.debug("timeOutRequests: Did not receive pong in time");
requestTimedOut = true;
} else {
for (let [channelID, request] of this._registerRequests) {
let duration = now - request.ctime;
// If any of the registration requests time out, all the ones after it
// also made to fail, since we are going to be disconnecting the
// socket.
requestTimedOut |= duration > this._requestTimeout;
if (requestTimedOut) {
request.reject(new Error(
"Register request timed out for channel ID " + channelID));
this._registerRequests.delete(channelID);
}
}
}
// The most likely reason for a pong or registration request timing out is
// that the socket has disconnected. Best to reconnect.
if (requestTimedOut) {
this._reconnect();
}
},
validServerURI: function(serverURI) {
if (serverURI.scheme == "ws") {
return !!prefs.get("testing.allowInsecureServerURL");
}
return serverURI.scheme == "wss";
},
get _UAID() {
return prefs.get("userAgentID");
},
set _UAID(newID) {
if (typeof(newID) !== "string") {
console.warn("Got invalid, non-string UAID", newID,
"Not updating userAgentID");
return;
}
console.debug("New _UAID", newID);
prefs.set("userAgentID", newID);
},
_ws: null,
_registerRequests: new Map(),
_currentState: STATE_SHUT_DOWN,
_requestTimeout: 0,
_requestTimeoutTimer: null,
_retryFailCount: 0,
/**
* According to the WS spec, servers should immediately close the underlying
* TCP connection after they close a WebSocket. This causes wsOnStop to be
* called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
* WebSocket up, it should try to reconnect. But if the server closes the
* WebSocket because it wants the client to back off, then the client
* shouldn't re-establish the connection. If the server sends the backoff
* close code, this field will be set to true in wsOnServerClose. It is
* checked in wsOnStop.
*/
_skipReconnect: false,
/** Indicates whether the server supports Web Push-style message delivery. */
_dataEnabled: false,
/**
* The last time the client sent a ping to the server. If non-zero, keeps the
* request timeout timer active. Reset to zero when the server responds with
* a pong or pending messages.
*/
_lastPingTime: 0,
/**
* A one-shot timer used to ping the server, to avoid timing out idle
* connections. Reset to the ping interval on each incoming message.
*/
_pingTimer: null,
/** A one-shot timer fired after the reconnect backoff period. */
_backoffTimer: null,
/**
* Sends a message to the Push Server through an open websocket.
* typeof(msg) shall be an object
*/
_wsSendMessage: function(msg) {
if (!this._ws) {
console.warn("wsSendMessage: No WebSocket initialized.",
"Cannot send a message");
return;
}
msg = JSON.stringify(msg);
console.debug("wsSendMessage: Sending message", msg);
this._ws.sendMsg(msg);
},
init: function(options, mainPushService, serverURI) {
console.debug("init()");
this._mainPushService = mainPushService;
this._serverURI = serverURI;
// Override the default WebSocket factory function. The returned object
// must be null or satisfy the nsIWebSocketChannel interface. Used by
// the tests to provide a mock WebSocket implementation.
if (options.makeWebSocket) {
this._makeWebSocket = options.makeWebSocket;
}
this._requestTimeout = prefs.get("requestTimeout");
return Promise.resolve();
},
_reconnect: function () {
console.debug("reconnect()");
this._shutdownWS(false);
this._startBackoffTimer();
},
_shutdownWS: function(shouldCancelPending = true) {
console.debug("shutdownWS()");
this._currentState = STATE_SHUT_DOWN;
this._skipReconnect = false;
prefs.ignore("userAgentID", this);
if (this._wsListener) {
this._wsListener._pushService = null;
}
try {
this._ws.close(0, null);
} catch (e) {}
this._ws = null;
this._lastPingTime = 0;
if (this._pingTimer) {
this._pingTimer.cancel();
}
if (shouldCancelPending) {
this._cancelRegisterRequests();
}
if (this._notifyRequestQueue) {
this._notifyRequestQueue();
this._notifyRequestQueue = null;
}
},
uninit: function() {
// All pending requests (ideally none) are dropped at this point. We
// shouldn't have any applications performing registration/unregistration
// or receiving notifications.
this._shutdownWS();
if (this._backoffTimer) {
this._backoffTimer.cancel();
}
if (this._requestTimeoutTimer) {
this._requestTimeoutTimer.cancel();
}
this._mainPushService = null;
this._dataEnabled = false;
},
/**
* How retries work: If the WS is closed due to a socket error,
* _startBackoffTimer() is called. The retry timer is started and when
* it times out, beginWSSetup() is called again.
*
* If we are in the middle of a timeout (i.e. waiting), but
* a register/unregister is called, we don't want to wait around anymore.
* _sendRequest will automatically call beginWSSetup(), which will cancel the
* timer. In addition since the state will have changed, even if a pending
* timer event comes in (because the timer fired the event before it was
* cancelled), so the connection won't be reset.
*/
_startBackoffTimer() {
console.debug("startBackoffTimer()");
// Calculate new timeout, but cap it to pingInterval.
let retryTimeout = prefs.get("retryBaseInterval") *
Math.pow(2, this._retryFailCount);
retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
this._retryFailCount++;
console.debug("startBackoffTimer: Retry in", retryTimeout,
"Try number", this._retryFailCount);
if (!this._backoffTimer) {
this._backoffTimer = Cc["@mozilla.org/timer;1"]
.createInstance(Ci.nsITimer);
}
this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
},
/** Indicates whether we're waiting for pongs or requests. */
_hasPendingRequests() {
return this._lastPingTime > 0 || this._registerRequests.size > 0;
},
/**
* Starts the request timeout timer unless we're already waiting for a pong
* or register request.
*/
_startRequestTimeoutTimer() {
if (this._hasPendingRequests()) {
return;
}
if (!this._requestTimeoutTimer) {
this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
.createInstance(Ci.nsITimer);
}
this._requestTimeoutTimer.init(this,
this._requestTimeout,
Ci.nsITimer.TYPE_REPEATING_SLACK);
},
/** Starts or resets the ping timer. */
_startPingTimer() {
if (!this._pingTimer) {
this._pingTimer = Cc["@mozilla.org/timer;1"]
.createInstance(Ci.nsITimer);
}
this._pingTimer.init(this, prefs.get("pingInterval"),
Ci.nsITimer.TYPE_ONE_SHOT);
},
_makeWebSocket: function(uri) {
if (!prefs.get("connection.enabled")) {
console.warn("makeWebSocket: connection.enabled is not set to true.",
"Aborting.");
return null;
}
if (Services.io.offline) {
console.warn("makeWebSocket: Network is offline.");
return null;
}
let socket = Cc["@mozilla.org/network/protocol;1?name=wss"]
.createInstance(Ci.nsIWebSocketChannel);
socket.initLoadInfo(null, // aLoadingNode
Services.scriptSecurityManager.getSystemPrincipal(),
null, // aTriggeringPrincipal
Ci.nsILoadInfo.SEC_NORMAL,
Ci.nsIContentPolicy.TYPE_WEBSOCKET);
return socket;
},
_beginWSSetup: function() {
console.debug("beginWSSetup()");
if (this._currentState != STATE_SHUT_DOWN) {
console.error("_beginWSSetup: Not in shutdown state! Current state",
this._currentState);
return;
}
// Stop any pending reconnects scheduled for the near future.
if (this._backoffTimer) {
this._backoffTimer.cancel();
}
let uri = this._serverURI;
if (!uri) {
return;
}
let socket = this._makeWebSocket(uri);
if (!socket) {
return;
}
this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
console.debug("beginWSSetup: Connecting to", uri.spec);
this._wsListener = new PushWebSocketListener(this);
this._ws.protocol = "push-notification";
try {
// Grab a wakelock before we open the socket to ensure we don't go to
// sleep before connection the is opened.
this._ws.asyncOpen(uri, uri.spec, 0, this._wsListener, null);
this._acquireWakeLock();
this._currentState = STATE_WAITING_FOR_WS_START;
} catch(e) {
console.error("beginWSSetup: Error opening websocket.",
"asyncOpen failed", e);
this._reconnect();
}
},
connect: function(records) {
console.debug("connect()");
// Check to see if we need to do anything.
if (records.length > 0) {
this._beginWSSetup();
}
},
isConnected: function() {
return !!this._ws;
},
_acquireWakeLock: function() {
if (!AppConstants.MOZ_B2G) {
return;
}
// Disable the wake lock on non-B2G platforms to work around bug 1154492.
if (!this._socketWakeLock) {
console.debug("acquireWakeLock: Acquiring Socket Wakelock");
this._socketWakeLock = gPowerManagerService.newWakeLock("cpu");
}
if (!this._socketWakeLockTimer) {
console.debug("acquireWakeLock: Creating Socket WakeLock Timer");
this._socketWakeLockTimer = Cc["@mozilla.org/timer;1"]
.createInstance(Ci.nsITimer);
}
console.debug("acquireWakeLock: Setting Socket WakeLock Timer");
this._socketWakeLockTimer
.initWithCallback(this._releaseWakeLock.bind(this),
// Allow the same time for socket setup as we do for
// requests after the setup. Fudge it a bit since
// timers can be a little off and we don't want to go
// to sleep just as the socket connected.
this._requestTimeout + 1000,
Ci.nsITimer.TYPE_ONE_SHOT);
},
_releaseWakeLock: function() {
if (!AppConstants.MOZ_B2G) {
return;
}
console.debug("releaseWakeLock: Releasing Socket WakeLock");
if (this._socketWakeLockTimer) {
this._socketWakeLockTimer.cancel();
}
if (this._socketWakeLock) {
this._socketWakeLock.unlock();
this._socketWakeLock = null;
}
},
/**
* Protocol handler invoked by server message.
*/
_handleHelloReply: function(reply) {
console.debug("handleHelloReply()");
if (this._currentState != STATE_WAITING_FOR_HELLO) {
console.error("handleHelloReply: Unexpected state", this._currentState,
"(expected STATE_WAITING_FOR_HELLO)");
this._shutdownWS();
return;
}
if (typeof reply.uaid !== "string") {
console.error("handleHelloReply: Received invalid UAID", reply.uaid);
this._shutdownWS();
return;
}
if (reply.uaid === "") {
console.error("handleHelloReply: Received empty UAID");
this._shutdownWS();
return;
}
// To avoid sticking extra large values sent by an evil server into prefs.
if (reply.uaid.length > 128) {
console.error("handleHelloReply: UAID received from server was too long",
reply.uaid);
this._shutdownWS();
return;
}
let sendRequests = () => {
if (this._notifyRequestQueue) {
this._notifyRequestQueue();
this._notifyRequestQueue = null;
}
this._sendRegisterRequests();
};
function finishHandshake() {
this._UAID = reply.uaid;
this._currentState = STATE_READY;
prefs.observe("userAgentID", this);
this._dataEnabled = !!reply.use_webpush;
if (this._dataEnabled) {
this._mainPushService.getAllUnexpired().then(records =>
Promise.all(records.map(record =>
this._mainPushService.ensureCrypto(record).catch(error => {
console.error("finishHandshake: Error updating record",
record.keyID, error);
})
))
).then(sendRequests);
} else {
sendRequests();
}
}
// By this point we've got a UAID from the server that we are ready to
// accept.
//
// We unconditionally drop all existing registrations and notify service
// workers if we receive a new UAID. This ensures we expunge all stale
// registrations if the `userAgentID` pref is reset.
if (this._UAID != reply.uaid) {
console.debug("handleHelloReply: Received new UAID");
this._mainPushService.dropUnexpiredRegistrations()
.then(finishHandshake.bind(this));
return;
}
// otherwise we are good to go
finishHandshake.bind(this)();
},
/**
* Protocol handler invoked by server message.
*/
_handleRegisterReply: function(reply) {
console.debug("handleRegisterReply()");
if (typeof reply.channelID !== "string" ||
!this._registerRequests.has(reply.channelID)) {
return;
}
let tmp = this._registerRequests.get(reply.channelID);
this._registerRequests.delete(reply.channelID);
if (!this._hasPendingRequests()) {
this._requestTimeoutTimer.cancel();
}
if (reply.status == 200) {
try {
Services.io.newURI(reply.pushEndpoint, null, null);
}
catch (e) {
tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
return;
}
let record = new PushRecordWebSocket({
channelID: reply.channelID,
pushEndpoint: reply.pushEndpoint,
scope: tmp.record.scope,
originAttributes: tmp.record.originAttributes,
version: null,
systemRecord: tmp.record.systemRecord,
appServerKey: tmp.record.appServerKey,
ctime: Date.now(),
});
Services.telemetry.getHistogramById("PUSH_API_SUBSCRIBE_WS_TIME").add(Date.now() - tmp.ctime);
tmp.resolve(record);
} else {
console.error("handleRegisterReply: Unexpected server response", reply);
tmp.reject(new Error("Wrong status code for register reply: " +
reply.status));
}
},
_handleDataUpdate: function(update) {
let promise;
if (typeof update.channelID != "string") {
console.warn("handleDataUpdate: Discarding update without channel ID",
update);
return;
}
if (typeof update.data != "string") {
promise = this._mainPushService.receivedPushMessage(
update.channelID,
update.version,
null,
null,
record => record
);
} else {
let params = getCryptoParams(update.headers);
if (params) {
let message = ChromeUtils.base64URLDecode(update.data, {
// The Push server may append padding.
padding: "ignore",
});
promise = this._mainPushService.receivedPushMessage(
update.channelID,
update.version,
message,
params,
record => record
);
} else {
promise = Promise.reject(new Error("Invalid crypto headers"));
}
}
promise.then(status => {
this._sendAck(update.channelID, update.version, status);
}, err => {
console.error("handleDataUpdate: Error delivering message", update, err);
this._sendAck(update.channelID, update.version,
Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR);
}).catch(err => {
console.error("handleDataUpdate: Error acknowledging message", update,
err);
});
},
/**
* Protocol handler invoked by server message.
*/
_handleNotificationReply: function(reply) {
console.debug("handleNotificationReply()");
if (this._dataEnabled) {
this._handleDataUpdate(reply);
return;
}
if (typeof reply.updates !== 'object') {
console.warn("handleNotificationReply: Missing updates", reply.updates);
return;
}
console.debug("handleNotificationReply: Got updates", reply.updates);
for (let i = 0; i < reply.updates.length; i++) {
let update = reply.updates[i];
console.debug("handleNotificationReply: Handling update", update);
if (typeof update.channelID !== "string") {
console.debug("handleNotificationReply: Invalid update at index",
i, update);
continue;
}
if (update.version === undefined) {
console.debug("handleNotificationReply: Missing version", update);
continue;
}
let version = update.version;
if (typeof version === "string") {
version = parseInt(version, 10);
}
if (typeof version === "number" && version >= 0) {
// FIXME(nsm): this relies on app update notification being infallible!
// eventually fix this
this._receivedUpdate(update.channelID, version);
}
}
},
reportDeliveryError(messageID, reason) {
console.debug("reportDeliveryError()");
let code = kDELIVERY_REASON_TO_CODE[reason];
if (!code) {
throw new Error('Invalid delivery error reason');
}
let data = {messageType: 'nack',
version: messageID,
code: code};
this._queueRequest(data);
},
_sendAck(channelID, version, status) {
console.debug("sendAck()");
let code = kACK_STATUS_TO_CODE[status];
if (!code) {
throw new Error('Invalid ack status');
}
let data = {messageType: 'ack',
updates: [{channelID: channelID,
version: version,
code: code}]};
this._queueRequest(data);
},
_generateID: function() {
let uuidGenerator = Cc["@mozilla.org/uuid-generator;1"]
.getService(Ci.nsIUUIDGenerator);
// generateUUID() gives a UUID surrounded by {...}, slice them off.
return uuidGenerator.generateUUID().toString().slice(1, -1);
},
register(record) {
console.debug("register() ", record);
// start the timer since we now have at least one request
this._startRequestTimeoutTimer();
let data = {channelID: this._generateID(),
messageType: "register"};
if (record.appServerKey) {
data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
// The Push server requires padding.
pad: true,
});
}
return new Promise((resolve, reject) => {
this._registerRequests.set(data.channelID, {
record: record,
resolve: resolve,
reject: reject,
ctime: Date.now(),
});
this._queueRequest(data);
}).then(record => {
if (!this._dataEnabled) {
return record;
}
return PushCrypto.generateKeys()
.then(([publicKey, privateKey]) => {
record.p256dhPublicKey = publicKey;
record.p256dhPrivateKey = privateKey;
record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
return record;
});
});
},
unregister(record, reason) {
console.debug("unregister() ", record, reason);
let code = kUNREGISTER_REASON_TO_CODE[reason];
if (!code) {
return Promise.reject(new Error('Invalid unregister reason'));
}
let data = {channelID: record.channelID,
messageType: "unregister",
code: code};
this._queueRequest(data);
return Promise.resolve();
},
_queueStart: Promise.resolve(),
_notifyRequestQueue: null,
_queue: null,
_enqueue: function(op) {
console.debug("enqueue()");
if (!this._queue) {
this._queue = this._queueStart;
}
this._queue = this._queue
.then(op)
.catch(_ => {});
},
_send(data) {
if (this._currentState == STATE_READY) {
if (data.messageType != "register" ||
this._registerRequests.has(data.channelID)) {
// check if request has not been cancelled
this._wsSendMessage(data);
}
}
},
_sendRegisterRequests() {
this._enqueue(_ => {
for (let channelID of this._registerRequests.keys()) {
this._send({
messageType: "register",
channelID: channelID,
});
}
});
},
_queueRequest(data) {
if (data.messageType != "register") {
if (this._currentState != STATE_READY && !this._notifyRequestQueue) {
let promise = new Promise((resolve, reject) => {
this._notifyRequestQueue = resolve;
});
this._enqueue(_ => promise);
}
this._enqueue(_ => this._send(data));
} else if (this._currentState == STATE_READY) {
this._send(data);
}
if (!this._ws) {
// This will end up calling notifyRequestQueue().
this._beginWSSetup();
// If beginWSSetup does not succeed to make ws, notifyRequestQueue will
// not be call.
if (!this._ws && this._notifyRequestQueue) {
this._notifyRequestQueue();
this._notifyRequestQueue = null;
}
}
},
_receivedUpdate: function(aChannelID, aLatestVersion) {
console.debug("receivedUpdate: Updating", aChannelID, "->", aLatestVersion);
this._mainPushService.receivedPushMessage(aChannelID, "", null, null, record => {
if (record.version === null ||
record.version < aLatestVersion) {
console.debug("receivedUpdate: Version changed for", aChannelID,
aLatestVersion);
record.version = aLatestVersion;
return record;
}
console.debug("receivedUpdate: No significant version change for",
aChannelID, aLatestVersion);
return null;
}).then(status => {
this._sendAck(aChannelID, aLatestVersion, status);
}).catch(err => {
console.error("receivedUpdate: Error acknowledging message", aChannelID,
aLatestVersion, err);
});
},
// begin Push protocol handshake
_wsOnStart: function(context) {
console.debug("wsOnStart()");
this._releaseWakeLock();
if (this._currentState != STATE_WAITING_FOR_WS_START) {
console.error("wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
"state", this._currentState, "Skipping");
return;
}
let data = {
messageType: "hello",
use_webpush: true,
};
if (this._UAID) {
data.uaid = this._UAID;
}
this._wsSendMessage(data);
this._currentState = STATE_WAITING_FOR_HELLO;
},
/**
* This statusCode is not the websocket protocol status code, but the TCP
* connection close status code.
*
* If we do not explicitly call ws.close() then statusCode is always
* NS_BASE_STREAM_CLOSED, even on a successful close.
*/
_wsOnStop: function(context, statusCode) {
console.debug("wsOnStop()");
this._releaseWakeLock();
if (statusCode != Cr.NS_OK && !this._skipReconnect) {
console.debug("wsOnStop: Reconnecting after socket error", statusCode);
this._reconnect();
return;
}
this._shutdownWS();
},
_wsOnMessageAvailable: function(context, message) {
console.debug("wsOnMessageAvailable()", message);
// Clearing the last ping time indicates we're no longer waiting for a pong.
this._lastPingTime = 0;
let reply;
try {
reply = JSON.parse(message);
} catch(e) {
console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
return;
}
// If we receive a message, we know the connection succeeded. Reset the
// connection attempt and ping interval counters.
this._retryFailCount = 0;
let doNotHandle = false;
if ((message === '{}') ||
(reply.messageType === undefined) ||
(reply.messageType === "ping") ||
(typeof reply.messageType != "string")) {
console.debug("wsOnMessageAvailable: Pong received");
doNotHandle = true;
}
// Reset the ping timer. Note: This path is executed at every step of the
// handshake, so this timer does not need to be set explicitly at startup.
this._startPingTimer();
// If it is a ping, do not handle the message.
if (doNotHandle) {
return;
}
// A whitelist of protocol handlers. Add to these if new messages are added
// in the protocol.
let handlers = ["Hello", "Register", "Notification"];
// Build up the handler name to call from messageType.
// e.g. messageType == "register" -> _handleRegisterReply.
let handlerName = reply.messageType[0].toUpperCase() +
reply.messageType.slice(1).toLowerCase();
if (handlers.indexOf(handlerName) == -1) {
console.warn("wsOnMessageAvailable: No whitelisted handler", handlerName,
"for message", reply.messageType);
return;
}
let handler = "_handle" + handlerName + "Reply";
if (typeof this[handler] !== "function") {
console.warn("wsOnMessageAvailable: Handler", handler,
"whitelisted but not implemented");
return;
}
this[handler](reply);
},
/**
* The websocket should never be closed. Since we don't call ws.close(),
* _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
* function), which calls reconnect and re-establishes the WebSocket
* connection.
*
* If the server requested that we back off, we won't reconnect until the
* next network state change event, or until we need to send a new register
* request.
*/
_wsOnServerClose: function(context, aStatusCode, aReason) {
console.debug("wsOnServerClose()", aStatusCode, aReason);
if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
console.debug("wsOnServerClose: Skipping automatic reconnect");
this._skipReconnect = true;
}
},
/**
* Rejects all pending register requests with errors.
*/
_cancelRegisterRequests: function() {
for (let request of this._registerRequests.values()) {
request.reject(new Error("Register request aborted"));
}
this._registerRequests.clear();
},
};
function PushRecordWebSocket(record) {
PushRecord.call(this, record);
this.channelID = record.channelID;
this.version = record.version;
}
PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
keyID: {
get() {
return this.channelID;
},
},
});
PushRecordWebSocket.prototype.toSubscription = function() {
let subscription = PushRecord.prototype.toSubscription.call(this);
subscription.version = this.version;
return subscription;
};