mirror of
https://github.com/ManchildProductions/UXP-Fixed.git
synced 2026-06-11 04:28:43 +00:00
58 lines
1.5 KiB
Python
58 lines
1.5 KiB
Python
#!/usr/bin/env python
|
|
|
|
import io
|
|
import os
|
|
import unittest
|
|
import proctest
|
|
from mozprocess import processhandler
|
|
|
|
here = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
class ProcTestOutput(proctest.ProcTest):
|
|
""" Class to test operations related to output handling """
|
|
|
|
def test_process_output_nonewline(self):
|
|
"""
|
|
Process is started, outputs data with no newline
|
|
"""
|
|
p = processhandler.ProcessHandler([self.python, "procnonewline.py"],
|
|
cwd=here)
|
|
|
|
p.run()
|
|
p.processOutput(timeout=5)
|
|
p.wait()
|
|
|
|
self.determine_status(p, False, ())
|
|
|
|
def test_stream_process_output(self):
|
|
"""
|
|
Process output stream does not buffer
|
|
"""
|
|
expected = '\n'.join([str(n) for n in range(0, 10)])
|
|
|
|
stream = io.BytesIO()
|
|
buf = io.BufferedRandom(stream)
|
|
|
|
p = processhandler.ProcessHandler([self.python, "proccountfive.py"],
|
|
cwd=here,
|
|
stream=buf)
|
|
|
|
p.run()
|
|
p.wait()
|
|
for i in range(5, 10):
|
|
stream.write(str(i) + '\n')
|
|
|
|
buf.flush()
|
|
self.assertEquals(stream.getvalue().strip(), expected)
|
|
|
|
# make sure mozprocess doesn't close the stream
|
|
# since mozprocess didn't create it
|
|
self.assertFalse(buf.closed)
|
|
buf.close()
|
|
|
|
self.determine_status(p, False, ())
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|