| import unittest |
| from test.support import import_helper, threading_helper |
| |
| resource = import_helper.import_module("resource") |
| |
| |
| NTHREADS = 10 |
| LOOP_PER_THREAD = 1000 |
| |
| |
| @threading_helper.requires_working_threading() |
| class ResourceTest(unittest.TestCase): |
| @unittest.skipUnless(hasattr(resource, "getrusage"), "needs getrusage") |
| @unittest.skipUnless( |
| hasattr(resource, "RUSAGE_THREAD"), "needs RUSAGE_THREAD" |
| ) |
| def test_getrusage(self): |
| ru_utime_lst = [] |
| |
| def dummy_work(ru_utime_lst): |
| for _ in range(LOOP_PER_THREAD): |
| pass |
| |
| usage_process = resource.getrusage(resource.RUSAGE_SELF) |
| usage_thread = resource.getrusage(resource.RUSAGE_THREAD) |
| # Process user time should be greater than thread user time |
| self.assertGreater(usage_process.ru_utime, usage_thread.ru_utime) |
| ru_utime_lst.append(usage_thread.ru_utime) |
| |
| threading_helper.run_concurrently( |
| worker_func=dummy_work, args=(ru_utime_lst,), nthreads=NTHREADS |
| ) |
| |
| usage_process = resource.getrusage(resource.RUSAGE_SELF) |
| self.assertEqual(len(ru_utime_lst), NTHREADS) |
| # Process user time should be greater than sum of all thread user times |
| self.assertGreater(usage_process.ru_utime, sum(ru_utime_lst)) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |