Files
basilisk55/netwerk/test/httpserver/test/test_async_response_sending.js
T

1683 lines
52 KiB
JavaScript

/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/*
* Ensures that data a request handler writes out in response is sent only as
* quickly as the client can receive it, without racing ahead and being forced
* to block while writing that data.
*
* NB: These tests are extremely tied to the current implementation, in terms of
* when and how stream-ready notifications occur, the amount of data which will
* be read or written at each notification, and so on. If the implementation
* changes in any way with respect to stream copying, this test will probably
* have to change a little at the edges as well.
*/
gThreadManager = Cc["@mozilla.org/thread-manager;1"].createInstance();
function run_test()
{
do_test_pending();
tests.push(function testsComplete(_)
{
dumpn("******************\n" +
"* TESTS COMPLETE *\n" +
"******************");
do_test_finished();
});
runNextTest();
}
function runNextTest()
{
testIndex++;
dumpn("*** runNextTest(), testIndex: " + testIndex);
try
{
var test = tests[testIndex];
test(runNextTest);
}
catch (e)
{
var msg = "exception running test " + testIndex + ": " + e;
if (e && "stack" in e)
msg += "\nstack follows:\n" + e.stack;
do_throw(msg);
}
}
/*************
* TEST DATA *
*************/
const NOTHING = [];
const FIRST_SEGMENT = [1, 2, 3, 4];
const SECOND_SEGMENT = [5, 6, 7, 8];
const THIRD_SEGMENT = [9, 10, 11, 12];
const SEGMENT = FIRST_SEGMENT;
const TWO_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8];
const THREE_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
const SEGMENT_AND_HALF = [1, 2, 3, 4, 5, 6];
const QUARTER_SEGMENT = [1];
const HALF_SEGMENT = [1, 2];
const SECOND_HALF_SEGMENT = [3, 4];
const THREE_QUARTER_SEGMENT = [1, 2, 3];
const EXTRA_HALF_SEGMENT = [5, 6];
const MIDDLE_HALF_SEGMENT = [2, 3];
const LAST_QUARTER_SEGMENT = [4];
const FOURTH_HALF_SEGMENT = [7, 8];
const HALF_THIRD_SEGMENT = [9, 10];
const LATTER_HALF_THIRD_SEGMENT = [11, 12];
const TWO_HALF_SEGMENTS = [1, 2, 1, 2];
/*********
* TESTS *
*********/
var tests =
[
sourceClosedWithoutWrite,
writeOneSegmentThenClose,
simpleWriteThenRead,
writeLittleBeforeReading,
writeMultipleSegmentsThenRead,
writeLotsBeforeReading,
writeLotsBeforeReading2,
writeThenReadPartial,
manyPartialWrites,
partialRead,
partialWrite,
sinkClosedImmediately,
sinkClosedWithReadableData,
sinkClosedAfterWrite,
sourceAndSinkClosed,
sinkAndSourceClosed,
sourceAndSinkClosedWithPendingData,
sinkAndSourceClosedWithPendingData,
];
var testIndex = -1;
function sourceClosedWithoutWrite(next)
{
var t = new CopyTest("sourceClosedWithoutWrite", next);
t.closeSource(Cr.NS_OK);
t.expect(Cr.NS_OK, [NOTHING]);
}
function writeOneSegmentThenClose(next)
{
var t = new CopyTest("writeLittleBeforeReading", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.closeSource(Cr.NS_OK);
t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
t.expect(Cr.NS_OK, [SEGMENT]);
}
function simpleWriteThenRead(next)
{
var t = new CopyTest("simpleWriteThenRead", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
t.closeSource(Cr.NS_OK);
t.expect(Cr.NS_OK, [SEGMENT]);
}
function writeLittleBeforeReading(next)
{
var t = new CopyTest("writeLittleBeforeReading", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.closeSource(Cr.NS_OK);
t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
t.expect(Cr.NS_OK, [SEGMENT, SEGMENT]);
}
function writeMultipleSegmentsThenRead(next)
{
var t = new CopyTest("writeMultipleSegmentsThenRead", next);
t.addToSource(TWO_SEGMENTS);
t.makeSourceReadable(TWO_SEGMENTS.length);
t.makeSinkWritableAndWaitFor(TWO_SEGMENTS.length,
[FIRST_SEGMENT, SECOND_SEGMENT]);
t.closeSource(Cr.NS_OK);
t.expect(Cr.NS_OK, [TWO_SEGMENTS]);
}
function writeLotsBeforeReading(next)
{
var t = new CopyTest("writeLotsBeforeReading", next);
t.addToSource(TWO_SEGMENTS);
t.makeSourceReadable(TWO_SEGMENTS.length);
t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.closeSource(Cr.NS_OK);
t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]);
t.expect(Cr.NS_OK, [TWO_SEGMENTS, SEGMENT, SEGMENT]);
}
function writeLotsBeforeReading2(next)
{
var t = new CopyTest("writeLotsBeforeReading", next);
t.addToSource(THREE_SEGMENTS);
t.makeSourceReadable(THREE_SEGMENTS.length);
t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.makeSinkWritableAndWaitFor(THIRD_SEGMENT.length, [THIRD_SEGMENT]);
t.closeSource(Cr.NS_OK);
t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]);
t.expect(Cr.NS_OK, [THREE_SEGMENTS, SEGMENT, SEGMENT]);
}
function writeThenReadPartial(next)
{
var t = new CopyTest("writeThenReadPartial", next);
t.addToSource(SEGMENT_AND_HALF);
t.makeSourceReadable(SEGMENT_AND_HALF.length);
t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
t.closeSource(Cr.NS_OK);
t.makeSinkWritableAndWaitFor(EXTRA_HALF_SEGMENT.length, [EXTRA_HALF_SEGMENT]);
t.expect(Cr.NS_OK, [SEGMENT_AND_HALF]);
}
function manyPartialWrites(next)
{
var t = new CopyTest("manyPartialWrites", next);
t.addToSource(HALF_SEGMENT);
t.makeSourceReadable(HALF_SEGMENT.length);
t.addToSource(HALF_SEGMENT);
t.makeSourceReadable(HALF_SEGMENT.length);
t.makeSinkWritableAndWaitFor(2 * HALF_SEGMENT.length, [TWO_HALF_SEGMENTS]);
t.closeSource(Cr.NS_OK);
t.expect(Cr.NS_OK, [TWO_HALF_SEGMENTS]);
}
function partialRead(next)
{
var t = new CopyTest("partialRead", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.addToSource(HALF_SEGMENT);
t.makeSourceReadable(HALF_SEGMENT.length);
t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]);
t.closeSourceAndWaitFor(Cr.NS_OK, HALF_SEGMENT.length, [HALF_SEGMENT]);
t.expect(Cr.NS_OK, [SEGMENT, HALF_SEGMENT]);
}
function partialWrite(next)
{
var t = new CopyTest("partialWrite", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length,
[QUARTER_SEGMENT,
MIDDLE_HALF_SEGMENT,
LAST_QUARTER_SEGMENT]);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length,
[HALF_SEGMENT, SECOND_HALF_SEGMENT]);
t.addToSource(THREE_SEGMENTS);
t.makeSourceReadable(THREE_SEGMENTS.length);
t.makeSinkWritableByIncrementsAndWaitFor(THREE_SEGMENTS.length,
[HALF_SEGMENT, SECOND_HALF_SEGMENT,
SECOND_SEGMENT,
HALF_THIRD_SEGMENT,
LATTER_HALF_THIRD_SEGMENT]);
t.closeSource(Cr.NS_OK);
t.expect(Cr.NS_OK, [SEGMENT, SEGMENT, THREE_SEGMENTS]);
}
function sinkClosedImmediately(next)
{
var t = new CopyTest("sinkClosedImmediately", next);
t.closeSink(Cr.NS_OK);
t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]);
}
function sinkClosedWithReadableData(next)
{
var t = new CopyTest("sinkClosedWithReadableData", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.closeSink(Cr.NS_OK);
t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]);
}
function sinkClosedAfterWrite(next)
{
var t = new CopyTest("sinkClosedAfterWrite", next);
t.addToSource(TWO_SEGMENTS);
t.makeSourceReadable(TWO_SEGMENTS.length);
t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]);
t.closeSink(Cr.NS_OK);
t.expect(Cr.NS_ERROR_UNEXPECTED, [FIRST_SEGMENT]);
}
function sourceAndSinkClosed(next)
{
var t = new CopyTest("sourceAndSinkClosed", next);
t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK);
t.expect(Cr.NS_OK, []);
}
function sinkAndSourceClosed(next)
{
var t = new CopyTest("sinkAndSourceClosed", next);
t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK);
// sink notify received first, hence error
t.expect(Cr.NS_ERROR_UNEXPECTED, []);
}
function sourceAndSinkClosedWithPendingData(next)
{
var t = new CopyTest("sourceAndSinkClosedWithPendingData", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK);
// not all data from source copied, so error
t.expect(Cr.NS_ERROR_UNEXPECTED, []);
}
function sinkAndSourceClosedWithPendingData(next)
{
var t = new CopyTest("sinkAndSourceClosedWithPendingData", next);
t.addToSource(SEGMENT);
t.makeSourceReadable(SEGMENT.length);
t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK);
// not all data from source copied, plus sink notify received first, so error
t.expect(Cr.NS_ERROR_UNEXPECTED, []);
}
/*************
* UTILITIES *
*************/
/** Returns the sum of the elements in arr. */
function sum(arr)
{
var sum = 0;
for (var i = 0, sz = arr.length; i < sz; i++)
sum += arr[i];
return sum;
}
/**
* Returns a constructor for an input or output stream callback that will wrap
* the one provided to it as an argument.
*
* @param wrapperCallback : (nsIInputStreamCallback | nsIOutputStreamCallback) : void
* the original callback object (not a function!) being wrapped
* @param name : string
* either "onInputStreamReady" if we're wrapping an input stream callback or
* "onOutputStreamReady" if we're wrapping an output stream callback
* @returns function(nsIInputStreamCallback | nsIOutputStreamCallback) : (nsIInputStreamCallback | nsIOutputStreamCallback)
* a constructor function which constructs a callback object (not function!)
* which, when called, first calls the original callback provided to it and
* then calls wrapperCallback
*/
function createStreamReadyInterceptor(wrapperCallback, name)
{
return function StreamReadyInterceptor(callback)
{
this.wrappedCallback = callback;
this[name] = function streamReadyInterceptor(stream)
{
dumpn("*** StreamReadyInterceptor." + name);
try
{
dumpn("*** calling original " + name + "...");
callback[name](stream);
}
catch (e)
{
dumpn("!!! error running inner callback: " + e);
throw e;
}
finally
{
dumpn("*** calling wrapper " + name + "...");
wrapperCallback[name](stream);
}
}
};
}
/**
* Print out a banner with the given message, uppercased, for debugging
* purposes.
*/
function note(m)
{
m = m.toUpperCase();
var asterisks = Array(m.length + 1 + 4).join("*");
dumpn(asterisks + "\n* " + m + " *\n" + asterisks);
}
/***********
* MOCKERY *
***********/
/*
* Blatantly violate abstractions in the name of testability. THIS IS NOT
* PUBLIC API! If you use any of these I will knowingly break your code by
* changing the names of variables and properties.
*/
var BinaryInputStream = function BIS(stream) { return stream; };
var BinaryOutputStream = function BOS(stream) { return stream; };
Response.SEGMENT_SIZE = SEGMENT.length;
/**
* Roughly mocks an nsIPipe, presenting non-blocking input and output streams
* that appear to also be binary streams and whose readability and writability
* amounts are configurable. Only the methods used in this test have been
* implemented -- these aren't exact mocks (can't be, actually, because input
* streams have unscriptable methods).
*
* @param name : string
* a name for this pipe, used in debugging output
*/
function CustomPipe(name)
{
var self = this;
/** Data read from input that's buffered until it can be written to output. */
this._data = [];
/**
* The status of this pipe, which is to say the error result the ends of this
* pipe will return when attempts are made to use them. This value is always
* an error result when copying has finished, because success codes are
* converted to NS_BASE_STREAM_CLOSED.
*/
this._status = Cr.NS_OK;
/** The input end of this pipe. */
var input = this.inputStream =
{
/** A name for this stream, used in debugging output. */
name: name + " input",
/**
* The number of bytes of data available to be read from this pipe, or
* Infinity if any amount of data in this pipe is made readable as soon as
* it is written to the pipe output.
*/
_readable: 0,
/**
* Data regarding a pending stream-ready callback on this, or null if no
* callback is currently waiting to be called.
*/
_waiter: null,
/**
* The event currently dispatched to make a stream-ready callback, if any
* such callback is currently ready to be made and not already in
* progress, or null when no callback is waiting to happen.
*/
_event: null,
/**
* A stream-ready constructor to wrap an existing callback to intercept
* stream-ready notifications, or null if notifications shouldn't be
* wrapped at all.
*/
_streamReadyInterceptCreator: null,
/**
* Registers a stream-ready wrapper creator function so that a
* stream-ready callback made in the future can be wrapped.
*/
interceptStreamReadyCallbacks: function(streamReadyInterceptCreator)
{
dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks");
do_check_true(this._streamReadyInterceptCreator === null,
"intercepting twice");
this._streamReadyInterceptCreator = streamReadyInterceptCreator;
if (this._waiter)
{
this._waiter.callback =
new streamReadyInterceptCreator(this._waiter.callback);
}
},
/**
* Removes a previously-registered stream-ready wrapper creator function,
* also clearing any current wrapping.
*/
removeStreamReadyInterceptor: function()
{
dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()");
do_check_true(this._streamReadyInterceptCreator !== null,
"removing interceptor when none present?");
this._streamReadyInterceptCreator = null;
if (this._waiter)
this._waiter.callback = this._waiter.callback.wrappedCallback;
},
//
// see nsIAsyncInputStream.asyncWait
//
asyncWait: function asyncWait(callback, flags, requestedCount, target)
{
dumpn("*** [" + this.name + "].asyncWait");
do_check_true(callback && typeof callback !== "function");
var closureOnly =
(flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0;
do_check_true(this._waiter === null ||
(this._waiter.closureOnly && !closureOnly),
"asyncWait already called with a non-closure-only " +
"callback? unexpected!");
this._waiter =
{
callback:
this._streamReadyInterceptCreator
? new this._streamReadyInterceptCreator(callback)
: callback,
closureOnly: closureOnly,
requestedCount: requestedCount,
eventTarget: target
};
if (!Components.isSuccessCode(self._status) ||
(!closureOnly && this._readable >= requestedCount &&
self._data.length >= requestedCount))
{
this._notify();
}
},
//
// see nsIAsyncInputStream.closeWithStatus
//
closeWithStatus: function closeWithStatus(status)
{
dumpn("*** [" + this.name + "].closeWithStatus" +
"(" + status + ")");
if (!Components.isSuccessCode(self._status))
{
dumpn("*** ignoring second closure of [input " + this.name + "] " +
"(status " + self._status + ")");
return;
}
if (Components.isSuccessCode(status))
status = Cr.NS_BASE_STREAM_CLOSED;
self._status = status;
if (this._waiter)
this._notify();
if (output._waiter)
output._notify();
},
//
// see nsIBinaryInputStream.readByteArray
//
readByteArray: function readByteArray(count)
{
dumpn("*** [" + this.name + "].readByteArray(" + count + ")");
if (self._data.length === 0)
{
throw Components.isSuccessCode(self._status)
? Cr.NS_BASE_STREAM_WOULD_BLOCK
: self._status;
}
do_check_true(this._readable <= self._data.length ||
this._readable === Infinity,
"consistency check");
if (this._readable < count || self._data.length < count)
throw Cr.NS_BASE_STREAM_WOULD_BLOCK;
this._readable -= count;
return self._data.splice(0, count);
},
/**
* Makes the given number of additional bytes of data previously written
* to the pipe's output stream available for reading, triggering future
* notifications when required.
*
* @param count : uint
* the number of bytes of additional data to make available; must not be
* greater than the number of bytes already buffered but not made
* available by previous makeReadable calls
*/
makeReadable: function makeReadable(count)
{
dumpn("*** [" + this.name + "].makeReadable(" + count + ")");
do_check_true(Components.isSuccessCode(self._status), "errant call");
do_check_true(this._readable + count <= self._data.length ||
this._readable === Infinity,
"increasing readable beyond written amount");
this._readable += count;
dumpn("readable: " + this._readable + ", data: " + self._data);
var waiter = this._waiter;
if (waiter !== null)
{
if (waiter.requestedCount <= this._readable && !waiter.closureOnly)
this._notify();
}
},
/**
* Disables the readability limit on this stream, meaning that as soon as
* *any* amount of data is written to output it becomes available from
* this stream and a stream-ready event is dispatched (if any stream-ready
* callback is currently set).
*/
disableReadabilityLimit: function disableReadabilityLimit()
{
dumpn("*** [" + this.name + "].disableReadabilityLimit()");
this._readable = Infinity;
},
//
// see nsIInputStream.available
//
available: function available()
{
dumpn("*** [" + this.name + "].available()");
if (self._data.length === 0 && !Components.isSuccessCode(self._status))
throw self._status;
return Math.min(this._readable, self._data.length);
},
/**
* Dispatches a pending stream-ready event ahead of schedule, rather than
* waiting for it to be dispatched in response to normal writes. This is
* useful when writing to the output has completed, and we need to have
* read all data written to this stream. If the output isn't closed and
* the reading of data from this races ahead of the last write to output,
* we need a notification to know when everything that's been written has
* been read. This ordinarily might be supplied by closing output, but
* in some cases it's not desirable to close output, so this supplies an
* alternative method to get notified when the last write has occurred.
*/
maybeNotifyFinally: function maybeNotifyFinally()
{
dumpn("*** [" + this.name + "].maybeNotifyFinally()");
do_check_true(this._waiter !== null, "must be waiting now");
if (self._data.length > 0)
{
dumpn("*** data still pending, normal notifications will signal " +
"completion");
return;
}
// No data waiting to be written, so notify. We could just close the
// stream, but that's less faithful to the server's behavior (it doesn't
// close the stream, and we're pretending to impersonate the server as
// much as we can here), so instead we're going to notify when no data
// can be read. The CopyTest has already been flagged as complete, so
// the stream listener will detect that this is a wrap-it-up notify and
// invoke the next test.
this._notify();
},
/**
* Dispatches an event to call a previously-registered stream-ready
* callback.
*/
_notify: function _notify()
{
dumpn("*** [" + this.name + "]._notify()");
var waiter = this._waiter;
do_check_true(waiter !== null, "no waiter?");
if (this._event === null)
{
var event = this._event =
{
run: function run()
{
input._waiter = null;
input._event = null;
try
{
do_check_true(!Components.isSuccessCode(self._status) ||
input._readable >= waiter.requestedCount);
waiter.callback.onInputStreamReady(input);
}
catch (e)
{
do_throw("error calling onInputStreamReady: " + e);
}
}
};
waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
}
},
QueryInterface: function QueryInterface(iid)
{
if (iid.equals(Ci.nsIAsyncInputStream) ||
iid.equals(Ci.nsIInputStream) ||
iid.equals(Ci.nsISupports))
{
return this;
}
throw Cr.NS_ERROR_NO_INTERFACE;
}
};
/** The output end of this pipe. */
var output = this.outputStream =
{
/** A name for this stream, used in debugging output. */
name: name + " output",
/**
* The number of bytes of data which may be written to this pipe without
* blocking.
*/
_writable: 0,
/**
* The increments in which pending data should be written, rather than
* simply defaulting to the amount requested (which, given that
* input.asyncWait precisely respects the requestedCount argument, will
* ordinarily always be writable in that amount), as an array whose
* elements from start to finish are the number of bytes to write each
* time write() or writeByteArray() is subsequently called. The sum of
* the values in this array, if this array is not empty, is always equal
* to this._writable.
*/
_writableAmounts: [],
/**
* Data regarding a pending stream-ready callback on this, or null if no
* callback is currently waiting to be called.
*/
_waiter: null,
/**
* The event currently dispatched to make a stream-ready callback, if any
* such callback is currently ready to be made and not already in
* progress, or null when no callback is waiting to happen.
*/
_event: null,
/**
* A stream-ready constructor to wrap an existing callback to intercept
* stream-ready notifications, or null if notifications shouldn't be
* wrapped at all.
*/
_streamReadyInterceptCreator: null,
/**
* Registers a stream-ready wrapper creator function so that a
* stream-ready callback made in the future can be wrapped.
*/
interceptStreamReadyCallbacks: function(streamReadyInterceptCreator)
{
dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks");
do_check_true(this._streamReadyInterceptCreator !== null,
"intercepting onOutputStreamReady twice");
this._streamReadyInterceptCreator = streamReadyInterceptCreator;
if (this._waiter)
{
this._waiter.callback =
new streamReadyInterceptCreator(this._waiter.callback);
}
},
/**
* Removes a previously-registered stream-ready wrapper creator function,
* also clearing any current wrapping.
*/
removeStreamReadyInterceptor: function()
{
dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()");
do_check_true(this._streamReadyInterceptCreator !== null,
"removing interceptor when none present?");
this._streamReadyInterceptCreator = null;
if (this._waiter)
this._waiter.callback = this._waiter.callback.wrappedCallback;
},
//
// see nsIAsyncOutputStream.asyncWait
//
asyncWait: function asyncWait(callback, flags, requestedCount, target)
{
dumpn("*** [" + this.name + "].asyncWait");
do_check_true(callback && typeof callback !== "function");
var closureOnly =
(flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0;
do_check_true(this._waiter === null ||
(this._waiter.closureOnly && !closureOnly),
"asyncWait already called with a non-closure-only " +
"callback? unexpected!");
this._waiter =
{
callback:
this._streamReadyInterceptCreator
? new this._streamReadyInterceptCreator(callback)
: callback,
closureOnly: closureOnly,
requestedCount: requestedCount,
eventTarget: target,
toString: function toString()
{
return "waiter(" + (closureOnly ? "closure only, " : "") +
"requestedCount: " + requestedCount + ", target: " +
target + ")";
}
};
if ((!closureOnly && this._writable >= requestedCount) ||
!Components.isSuccessCode(this.status))
{
this._notify();
}
},
//
// see nsIAsyncOutputStream.closeWithStatus
//
closeWithStatus: function closeWithStatus(status)
{
dumpn("*** [" + this.name + "].closeWithStatus(" + status + ")");
if (!Components.isSuccessCode(self._status))
{
dumpn("*** ignoring redundant closure of [input " + this.name + "] " +
"because it's already closed (status " + self._status + ")");
return;
}
if (Components.isSuccessCode(status))
status = Cr.NS_BASE_STREAM_CLOSED;
self._status = status;
if (input._waiter)
input._notify();
if (this._waiter)
this._notify();
},
//
// see nsIBinaryOutputStream.writeByteArray
//
writeByteArray: function writeByteArray(bytes, length)
{
dumpn("*** [" + this.name + "].writeByteArray" +
"([" + bytes + "], " + length + ")");
do_check_eq(bytes.length, length, "sanity");
if (!Components.isSuccessCode(self._status))
throw self._status;
do_check_eq(this._writableAmounts.length, 0,
"writeByteArray can't support specified-length writes");
if (this._writable < length)
throw Cr.NS_BASE_STREAM_WOULD_BLOCK;
self._data.push.apply(self._data, bytes);
this._writable -= length;
if (input._readable === Infinity && input._waiter &&
!input._waiter.closureOnly)
{
input._notify();
}
},
//
// see nsIOutputStream.write
//
write: function write(str, length)
{
dumpn("*** [" + this.name + "].write");
do_check_eq(str.length, length, "sanity");
if (!Components.isSuccessCode(self._status))
throw self._status;
if (this._writable === 0)
throw Cr.NS_BASE_STREAM_WOULD_BLOCK;
var actualWritten;
if (this._writableAmounts.length === 0)
{
actualWritten = Math.min(this._writable, length);
}
else
{
do_check_true(this._writable >= this._writableAmounts[0],
"writable amounts value greater than writable data?");
do_check_eq(this._writable, sum(this._writableAmounts),
"total writable amount not equal to sum of writable " +
"increments");
actualWritten = this._writableAmounts.shift();
}
var bytes = str.substring(0, actualWritten)
.split("")
.map(function(v) { return v.charCodeAt(0); });
self._data.push.apply(self._data, bytes);
this._writable -= actualWritten;
if (input._readable === Infinity && input._waiter &&
!input._waiter.closureOnly)
{
input._notify();
}
return actualWritten;
},
/**
* Increase the amount of data that can be written without blocking by the
* given number of bytes, triggering future notifications when required.
*
* @param count : uint
* the number of bytes of additional data to make writable
*/
makeWritable: function makeWritable(count)
{
dumpn("*** [" + this.name + "].makeWritable(" + count + ")");
do_check_true(Components.isSuccessCode(self._status));
this._writable += count;
var waiter = this._waiter;
if (waiter && !waiter.closureOnly &&
waiter.requestedCount <= this._writable)
{
this._notify();
}
},
/**
* Increase the amount of data that can be written without blocking, but
* do so by specifying a number of bytes that will be written each time
* a write occurs, even as asyncWait notifications are initially triggered
* as usual. Thus, rather than writes eagerly writing everything possible
* at each step, attempts to write out data by segment devolve into a
* partial segment write, then another, and so on until the amount of data
* specified as permitted to be written, has been written.
*
* Note that the writeByteArray method is incompatible with the previous
* calling of this method, in that, until all increments provided to this
* method have been consumed, writeByteArray cannot be called. Once all
* increments have been consumed, writeByteArray may again be called.
*
* @param increments : [uint]
* an array whose elements are positive numbers of bytes to permit to be
* written each time write() is subsequently called on this, ignoring
* the total amount of writable space specified by the sum of all
* increments
*/
makeWritableByIncrements: function makeWritableByIncrements(increments)
{
dumpn("*** [" + this.name + "].makeWritableByIncrements" +
"([" + increments.join(", ") + "])");
do_check_true(increments.length > 0, "bad increments");
do_check_true(increments.every(function(v) { return v > 0; }),
"zero increment?");
do_check_true(Components.isSuccessCode(self._status));
this._writable += sum(increments);
this._writableAmounts = increments;
var waiter = this._waiter;
if (waiter && !waiter.closureOnly &&
waiter.requestedCount <= this._writable)
{
this._notify();
}
},
/**
* Dispatches an event to call a previously-registered stream-ready
* callback.
*/
_notify: function _notify()
{
dumpn("*** [" + this.name + "]._notify()");
var waiter = this._waiter;
do_check_true(waiter !== null, "no waiter?");
if (this._event === null)
{
var event = this._event =
{
run: function run()
{
output._waiter = null;
output._event = null;
try
{
waiter.callback.onOutputStreamReady(output);
}
catch (e)
{
do_throw("error calling onOutputStreamReady: " + e);
}
}
};
waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
}
},
QueryInterface: function QueryInterface(iid)
{
if (iid.equals(Ci.nsIAsyncOutputStream) ||
iid.equals(Ci.nsIOutputStream) ||
iid.equals(Ci.nsISupports))
{
return this;
}
throw Cr.NS_ERROR_NO_INTERFACE;
}
};
}
/**
* Represents a sequence of interactions to perform with a copier, in a given
* order and at the desired time intervals.
*
* @param name : string
* test name, used in debugging output
*/
function CopyTest(name, next)
{
/** Name used in debugging output. */
this.name = name;
/** A function called when the test completes. */
this._done = next;
var sourcePipe = new CustomPipe(name + "-source");
/** The source of data for the copier to copy. */
this._source = sourcePipe.inputStream;
/**
* The sink to which to write data which will appear in the copier's source.
*/
this._copyableDataStream = sourcePipe.outputStream;
var sinkPipe = new CustomPipe(name + "-sink");
/** The sink to which the copier copies data. */
this._sink = sinkPipe.outputStream;
/** Input stream from which to read data the copier's written to its sink. */
this._copiedDataStream = sinkPipe.inputStream;
this._copiedDataStream.disableReadabilityLimit();
/**
* True if there's a callback waiting to read data written by the copier to
* its output, from the input end of the pipe representing the copier's sink.
*/
this._waitingForData = false;
/**
* An array of the bytes of data expected to be written to output by the
* copier when this test runs.
*/
this._expectedData = undefined;
/** Array of bytes of data received so far. */
this._receivedData = [];
/** The expected final status returned by the copier. */
this._expectedStatus = -1;
/** The actual final status returned by the copier. */
this._actualStatus = -1;
/** The most recent sequence of bytes written to output by the copier. */
this._lastQuantum = [];
/**
* True iff we've received the last quantum of data written to the sink by the
* copier.
*/
this._allDataWritten = false;
/**
* True iff the copier has notified its associated stream listener of
* completion.
*/
this._copyingFinished = false;
/** Index of the next task to execute while driving the copier. */
this._currentTask = 0;
/** Array containing all tasks to run. */
this._tasks = [];
/** The copier used by this test. */
this._copier =
new WriteThroughCopier(this._source, this._sink, this, null);
// Start watching for data written by the copier to the sink.
this._waitForWrittenData();
}
CopyTest.prototype =
{
/**
* Adds the given array of bytes to data in the copier's source.
*
* @param bytes : [uint]
* array of bytes of data to add to the source for the copier
*/
addToSource: function addToSource(bytes)
{
var self = this;
this._addToTasks(function addToSourceTask()
{
note("addToSourceTask");
try
{
self._copyableDataStream.makeWritable(bytes.length);
self._copyableDataStream.writeByteArray(bytes, bytes.length);
}
finally
{
self._stageNextTask();
}
});
},
/**
* Makes bytes of data previously added to the source available to be read by
* the copier.
*
* @param count : uint
* number of bytes to make available for reading
*/
makeSourceReadable: function makeSourceReadable(count)
{
var self = this;
this._addToTasks(function makeSourceReadableTask()
{
note("makeSourceReadableTask");
self._source.makeReadable(count);
self._stageNextTask();
});
},
/**
* Increases available space in the sink by the given amount, waits for the
* given series of arrays of bytes to be written to sink by the copier, and
* causes execution to asynchronously continue to the next task when the last
* of those arrays of bytes is received.
*
* @param bytes : uint
* number of bytes of space to make available in the sink
* @param dataQuantums : [[uint]]
* array of byte arrays to expect to be written in sequence to the sink
*/
makeSinkWritableAndWaitFor:
function makeSinkWritableAndWaitFor(bytes, dataQuantums)
{
var self = this;
do_check_eq(bytes,
dataQuantums.reduce(function(partial, current)
{
return partial + current.length;
}, 0),
"bytes/quantums mismatch");
function increaseSinkSpaceTask()
{
/* Now do the actual work to trigger the interceptor. */
self._sink.makeWritable(bytes);
}
this._waitForHelper("increaseSinkSpaceTask",
dataQuantums, increaseSinkSpaceTask);
},
/**
* Increases available space in the sink by the given amount, waits for the
* given series of arrays of bytes to be written to sink by the copier, and
* causes execution to asynchronously continue to the next task when the last
* of those arrays of bytes is received.
*
* @param bytes : uint
* number of bytes of space to make available in the sink
* @param dataQuantums : [[uint]]
* array of byte arrays to expect to be written in sequence to the sink
*/
makeSinkWritableByIncrementsAndWaitFor:
function makeSinkWritableByIncrementsAndWaitFor(bytes, dataQuantums)
{
var self = this;
var desiredAmounts = dataQuantums.map(function(v) { return v.length; });
do_check_eq(bytes, sum(desiredAmounts), "bytes/quantums mismatch");
function increaseSinkSpaceByIncrementsTask()
{
/* Now do the actual work to trigger the interceptor incrementally. */
self._sink.makeWritableByIncrements(desiredAmounts);
}
this._waitForHelper("increaseSinkSpaceByIncrementsTask",
dataQuantums, increaseSinkSpaceByIncrementsTask);
},
/**
* Close the copier's source stream, then asynchronously continue to the next
* task.
*
* @param status : nsresult
* the status to provide when closing the copier's source stream
*/
closeSource: function closeSource(status)
{
var self = this;
this._addToTasks(function closeSourceTask()
{
note("closeSourceTask");
self._source.closeWithStatus(status);
self._stageNextTask();
});
},
/**
* Close the copier's source stream, then wait for the given number of bytes
* and for the given series of arrays of bytes to be written to the sink, then
* asynchronously continue to the next task.
*
* @param status : nsresult
* the status to provide when closing the copier's source stream
* @param bytes : uint
* number of bytes of space to make available in the sink
* @param dataQuantums : [[uint]]
* array of byte arrays to expect to be written in sequence to the sink
*/
closeSourceAndWaitFor:
function closeSourceAndWaitFor(status, bytes, dataQuantums)
{
var self = this;
do_check_eq(bytes, sum(dataQuantums.map(function(v) { return v.length; })),
"bytes/quantums mismatch");
function closeSourceAndWaitForTask()
{
self._sink.makeWritable(bytes);
self._copyableDataStream.closeWithStatus(status);
}
this._waitForHelper("closeSourceAndWaitForTask",
dataQuantums, closeSourceAndWaitForTask);
},
/**
* Closes the copier's sink stream, providing the given status, then
* asynchronously continue to the next task.
*
* @param status : nsresult
* the status to provide when closing the copier's sink stream
*/
closeSink: function closeSink(status)
{
var self = this;
this._addToTasks(function closeSinkTask()
{
note("closeSinkTask");
self._sink.closeWithStatus(status);
self._stageNextTask();
});
},
/**
* Closes the copier's source stream, then immediately closes the copier's
* sink stream, then asynchronously continues to the next task.
*
* @param sourceStatus : nsresult
* the status to provide when closing the copier's source stream
* @param sinkStatus : nsresult
* the status to provide when closing the copier's sink stream
*/
closeSourceThenSink: function closeSourceThenSink(sourceStatus, sinkStatus)
{
var self = this;
this._addToTasks(function closeSourceThenSinkTask()
{
note("closeSourceThenSinkTask");
self._source.closeWithStatus(sourceStatus);
self._sink.closeWithStatus(sinkStatus);
self._stageNextTask();
});
},
/**
* Closes the copier's sink stream, then immediately closes the copier's
* source stream, then asynchronously continues to the next task.
*
* @param sinkStatus : nsresult
* the status to provide when closing the copier's sink stream
* @param sourceStatus : nsresult
* the status to provide when closing the copier's source stream
*/
closeSinkThenSource: function closeSinkThenSource(sinkStatus, sourceStatus)
{
var self = this;
this._addToTasks(function closeSinkThenSourceTask()
{
note("closeSinkThenSource");
self._sink.closeWithStatus(sinkStatus);
self._source.closeWithStatus(sourceStatus);
self._stageNextTask();
});
},
/**
* Indicates that the given status is expected to be returned when the stream
* listener for the copy indicates completion, that the expected data copied
* by the copier to sink are the concatenation of the arrays of bytes in
* receivedData, and kicks off the tasks in this test.
*
* @param expectedStatus : nsresult
* the status expected to be returned by the copier at completion
* @param receivedData : [[uint]]
* an array containing arrays of bytes whose concatenation constitutes the
* expected copied data
*/
expect: function expect(expectedStatus, receivedData)
{
this._expectedStatus = expectedStatus;
this._expectedData = [];
for (var i = 0, sz = receivedData.length; i < sz; i++)
this._expectedData.push.apply(this._expectedData, receivedData[i]);
this._stageNextTask();
},
/**
* Sets up a stream interceptor that will verify that each piece of data
* written to the sink by the copier corresponds to the currently expected
* pieces of data, calls the trigger, then waits for those pieces of data to
* be received. Once all have been received, the interceptor is removed and
* the next task is asynchronously executed.
*
* @param name : string
* name of the task created by this, used in debugging output
* @param dataQuantums : [[uint]]
* array of expected arrays of bytes to be written to the sink by the copier
* @param trigger : function() : void
* function to call after setting up the interceptor to wait for
* notifications (which will be generated as a result of this function's
* actions)
*/
_waitForHelper: function _waitForHelper(name, dataQuantums, trigger)
{
var self = this;
this._addToTasks(function waitForHelperTask()
{
note(name);
var quantumIndex = 0;
/*
* Intercept all data-available notifications so we can continue when all
* the ones we expect have been received.
*/
var streamReadyCallback =
{
onInputStreamReady: function wrapperOnInputStreamReady(input)
{
dumpn("*** streamReadyCallback.onInputStreamReady" +
"(" + input.name + ")");
do_check_eq(this, streamReadyCallback, "sanity");
try
{
if (quantumIndex < dataQuantums.length)
{
var quantum = dataQuantums[quantumIndex++];
var sz = quantum.length;
do_check_eq(self._lastQuantum.length, sz,
"different quantum lengths");
for (var i = 0; i < sz; i++)
{
do_check_eq(self._lastQuantum[i], quantum[i],
"bad data at " + i);
}
dumpn("*** waiting to check remaining " +
(dataQuantums.length - quantumIndex) + " quantums...");
}
}
finally
{
if (quantumIndex === dataQuantums.length)
{
dumpn("*** data checks completed! next task...");
self._copiedDataStream.removeStreamReadyInterceptor();
self._stageNextTask();
}
}
}
};
var interceptor =
createStreamReadyInterceptor(streamReadyCallback, "onInputStreamReady");
self._copiedDataStream.interceptStreamReadyCallbacks(interceptor);
/* Do the deed. */
trigger();
});
},
/**
* Initiates asynchronous waiting for data written to the copier's sink to be
* available for reading from the input end of the sink's pipe. The callback
* stores the received data for comparison in the interceptor used in the
* callback added by _waitForHelper and signals test completion when it
* receives a zero-data-available notification (if the copier has notified
* that it is finished; otherwise allows execution to continue until that has
* occurred).
*/
_waitForWrittenData: function _waitForWrittenData()
{
dumpn("*** _waitForWrittenData (" + this.name + ")");
var self = this;
var outputWrittenWatcher =
{
onInputStreamReady: function onInputStreamReady(input)
{
dumpn("*** outputWrittenWatcher.onInputStreamReady" +
"(" + input.name + ")");
if (self._allDataWritten)
{
do_throw("ruh-roh! why are we getting notified of more data " +
"after we should have received all of it?");
}
self._waitingForData = false;
try
{
var avail = input.available();
}
catch (e)
{
dumpn("*** available() threw! error: " + e);
if (self._completed)
{
dumpn("*** NB: this isn't a problem, because we've copied " +
"completely now, and this notify may have been expedited " +
"by maybeNotifyFinally such that we're being called when " +
"we can *guarantee* nothing is available any more");
}
avail = 0;
}
if (avail > 0)
{
var data = input.readByteArray(avail);
do_check_eq(data.length, avail,
"readByteArray returned wrong number of bytes?");
self._lastQuantum = data;
self._receivedData.push.apply(self._receivedData, data);
}
if (avail === 0)
{
dumpn("*** all data received!");
self._allDataWritten = true;
if (self._copyingFinished)
{
dumpn("*** copying already finished, continuing to next test");
self._testComplete();
}
else
{
dumpn("*** copying not finished, waiting for that to happen");
}
return;
}
self._waitForWrittenData();
}
};
this._copiedDataStream.asyncWait(outputWrittenWatcher, 0, 1,
gThreadManager.currentThread);
this._waitingForData = true;
},
/**
* Indicates this test is complete, does the final data-received and copy
* status comparisons, and calls the test-completion function provided when
* this test was first created.
*/
_testComplete: function _testComplete()
{
dumpn("*** CopyTest(" + this.name + ") complete! " +
"On to the next test...");
try
{
do_check_true(this._allDataWritten, "expect all data written now!");
do_check_true(this._copyingFinished, "expect copying finished now!");
do_check_eq(this._actualStatus, this._expectedStatus,
"wrong final status");
var expected = this._expectedData, received = this._receivedData;
dumpn("received: [" + received + "], expected: [" + expected + "]");
do_check_eq(received.length, expected.length, "wrong data");
for (var i = 0, sz = expected.length; i < sz; i++)
do_check_eq(received[i], expected[i], "bad data at " + i);
}
catch (e)
{
dumpn("!!! ERROR PERFORMING FINAL " + this.name + " CHECKS! " + e);
throw e;
}
finally
{
dumpn("*** CopyTest(" + this.name + ") complete! " +
"Invoking test-completion callback...");
this._done();
}
},
/** Dispatches an event at this thread which will run the next task. */
_stageNextTask: function _stageNextTask()
{
dumpn("*** CopyTest(" + this.name + ")._stageNextTask()");
if (this._currentTask === this._tasks.length)
{
dumpn("*** CopyTest(" + this.name + ") tasks complete!");
return;
}
var task = this._tasks[this._currentTask++];
var self = this;
var event =
{
run: function run()
{
try
{
task();
}
catch (e)
{
do_throw("exception thrown running task: " + e);
}
}
};
gThreadManager.currentThread.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
},
/**
* Adds the given function as a task to be run at a later time.
*
* @param task : function() : void
* the function to call as a task
*/
_addToTasks: function _addToTasks(task)
{
this._tasks.push(task);
},
//
// see nsIRequestObserver.onStartRequest
//
onStartRequest: function onStartRequest(self, _)
{
dumpn("*** CopyTest.onStartRequest (" + self.name + ")");
do_check_true(_ === null);
do_check_eq(this._receivedData.length, 0);
do_check_eq(this._lastQuantum.length, 0);
},
//
// see nsIRequestObserver.onStopRequest
//
onStopRequest: function onStopRequest(self, _, status)
{
dumpn("*** CopyTest.onStopRequest (" + self.name + ", " + status + ")");
do_check_true(_ === null);
this._actualStatus = status;
this._copyingFinished = true;
if (this._allDataWritten)
{
dumpn("*** all data written, continuing with remaining tests...");
this._testComplete();
}
else
{
/*
* Everything's copied as far as the copier is concerned. However, there
* may be a backup transferring from the output end of the copy sink to
* the input end where we can actually verify that the expected data was
* written as expected, because that transfer occurs asynchronously. If
* we do final data-received checks now, we'll miss still-pending data.
* Therefore, to wrap up this copy test we still need to asynchronously
* wait on the input end of the sink until we hit end-of-stream or some
* error condition. Then we know we're done and can continue with the
* next test.
*/
dumpn("*** not all data copied, waiting for that to happen...");
if (!this._waitingForData)
this._waitForWrittenData();
this._copiedDataStream.maybeNotifyFinally();
}
}
};