| # Copyright 2015 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import random |
| |
| import mojo_unittest |
| from mojo_bindings import promise |
| |
| # pylint: disable=F0401 |
| import mojo_system as system |
| |
| # pylint: disable=F0401 |
| from mojo_utils import data_pipe_utils |
| |
| |
| def _GetRandomBuffer(size): |
| random.seed(size) |
| return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size))) |
| |
| |
| class DataPipeCopyTest(mojo_unittest.MojoTestCase): |
| def setUp(self): |
| super(DataPipeCopyTest, self).setUp() |
| self.handles = system.DataPipe() |
| self.error = None |
| |
| def tearDown(self): |
| self.handles = None |
| super(DataPipeCopyTest, self).tearDown() |
| |
| def _writeDataAndClose(self, handle, data): |
| status, num_bytes_written = handle.WriteData(data) |
| handle.Close() |
| self.assertEquals(system.RESULT_OK, status) |
| self.assertEquals(len(data), num_bytes_written) |
| |
| def _copyDataFromPipe(self, handle, expected_data, |
| deadline=system.DEADLINE_INDEFINITE): |
| self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe( |
| handle, deadline), expected_data).Catch(self._CatchError) |
| |
| def _CatchError(self, error): |
| if self.loop: |
| self.loop.Quit() |
| self.error = error |
| |
| @promise.async |
| def _VerifyDataCopied(self, data, expected_data): |
| self.assertEquals(expected_data, data) |
| self.loop.Quit() |
| |
| def _runAndCheckError(self): |
| self.loop.Run() |
| if self.error: |
| # pylint: disable=E0702 |
| raise self.error |
| |
| def _testEagerWrite(self, data): |
| self._writeDataAndClose(self.handles.producer_handle, data) |
| self._copyDataFromPipe(self.handles.consumer_handle, data) |
| self._runAndCheckError() |
| |
| def _testDelayedWrite(self, data): |
| self._copyDataFromPipe(self.handles.consumer_handle, data) |
| self._writeDataAndClose(self.handles.producer_handle, data) |
| self._runAndCheckError() |
| |
| def testTimeout(self): |
| self._copyDataFromPipe(self.handles.consumer_handle, bytearray(), |
| deadline=100) |
| with self.assertRaises(data_pipe_utils.DataPipeCopyException): |
| self._runAndCheckError() |
| |
| def testCloseProducerWithoutWriting(self): |
| self._copyDataFromPipe(self.handles.consumer_handle, bytearray()) |
| self.handles.producer_handle.Close() |
| self._runAndCheckError() |
| |
| def testEagerWriteOfEmptyData(self): |
| self._testEagerWrite(bytearray()) |
| |
| def testDelayedWriteOfEmptyData(self): |
| self._testDelayedWrite(bytearray()) |
| |
| def testEagerWriteOfNonEmptyData(self): |
| self._testEagerWrite(_GetRandomBuffer(1024)) |
| |
| def testDelayedWriteOfNonEmptyData(self): |
| self._testDelayedWrite(_GetRandomBuffer(1024)) |
| |
| def testEagerWriteOfLargeBuffer(self): |
| self._testEagerWrite(_GetRandomBuffer(32 * 1024)) |
| |
| def testDelayedWriteOfLargeBuffer(self): |
| self._testDelayedWrite(_GetRandomBuffer(32 * 1024)) |