From 1a860aa882fc1e2b532e31d366685d6314a68ea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marco=20Trevisan=20=28Trevi=C3=B1o=29?= Date: Fri, 6 Nov 2020 17:42:51 +0100 Subject: [PATCH] tests/fprintd: Add tests ensuring that concurrent calls to fprintd work Simulate the case in which multiple users are trying to access a device at the same time, verifying that the access is granted only to the one that first completes the authorization phase and that no other client is then allowed. --- tests/fprintd.py | 174 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/tests/fprintd.py b/tests/fprintd.py index eb93aef..fe0cbb1 100644 --- a/tests/fprintd.py +++ b/tests/fprintd.py @@ -1164,6 +1164,180 @@ class FPrintdVirtualDeviceIdentificationTests(FPrintdVirtualDeviceVerificationTe cls.verify_finger = 'any' +class FPrindConcurrentPolkitRequestsTest(FPrintdVirtualDeviceBaseTest): + + def wait_for_hanging_clients(self): + while not self._polkitd_obj.HaveHangingCalls(): + pass + self.assertTrue(self._polkitd_obj.HaveHangingCalls()) + + def start_hanging_gdbus_claim(self, user='testuser'): + gdbus = self.gdbus_device_method_call_process('Claim', [user]) + self.assertIsNone(gdbus.poll()) + self.wait_for_hanging_clients() + self.addCleanup(gdbus.kill) + return gdbus + + def test_hanging_claim_does_not_block_new_claim_external_client(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll' ]) + self._polkitd_obj.SimulateHang(True) + self._polkitd_obj.SetDelay(0.5) + + gdbus = self.start_hanging_gdbus_claim() + + self._polkitd_obj.SimulateHang(False) + self.device.Claim('(s)', self.get_current_user()) + + self.assertIsNone(gdbus.poll()) + self._polkitd_obj.ReleaseHangingCalls() + + gdbus.wait() + with self.assertFprintError('AlreadyInUse'): + raise GLib.GError(gdbus.stdout.read()) + + self.device.Release() + + def test_hanging_claim_does_not_block_new_claim(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll' ]) + self._polkitd_obj.SimulateHang(True) + self._polkitd_obj.SetDelay(0.5) + + self.call_device_method_async('Claim', '(s)', ['']) + self.wait_for_hanging_clients() + + self._polkitd_obj.SimulateHang(False) + self.device.Claim('(s)', self.get_current_user()) + + self._polkitd_obj.ReleaseHangingCalls() + + with self.assertFprintError('AlreadyInUse'): + self.wait_for_device_reply() + + self.device.Release() + + def test_hanging_claim_enroll_does_not_block_new_claim(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll' ]) + self._polkitd_obj.SimulateHangActions([ + 'net.reactivated.fprint.device.enroll']) + self._polkitd_obj.SetDelay(0.5) + + gdbus = self.start_hanging_gdbus_claim() + + self._polkitd_obj.SimulateHangActions(['']) + self.device.Claim('(s)', self.get_current_user()) + + self.assertIsNone(gdbus.poll()) + self._polkitd_obj.ReleaseHangingCalls() + + gdbus.wait() + with self.assertFprintError('AlreadyInUse'): + raise GLib.GError(gdbus.stdout.read()) + + self.device.Release() + + def test_hanging_claim_does_not_block_new_release(self): + self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername']) + self._polkitd_obj.SimulateHang(True) + + gdbus = self.gdbus_device_method_call_process('Claim', ['testuser']) + self.addCleanup(gdbus.kill) + + self.wait_for_hanging_clients() + with self.assertFprintError('ClaimDevice'): + self.device.Release() + + self.assertIsNone(gdbus.poll()) + + def test_hanging_claim_does_not_block_list(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll', + 'net.reactivated.fprint.device.verify']) + + self.device.Claim('(s)', '') + self.enroll_image('whorl', finger='left-thumb') + self.device.Release() + + self._polkitd_obj.SimulateHangActions([ + 'net.reactivated.fprint.device.setusername']) + + gdbus = self.start_hanging_gdbus_claim() + + self.assertEqual(self.device.ListEnrolledFingers('(s)', + self.get_current_user()), ['left-thumb']) + + self.assertIsNone(gdbus.poll()) + + def test_hanging_claim_can_proceed_when_released(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.verify']) + + self._polkitd_obj.SimulateHangActions([ + 'net.reactivated.fprint.device.setusername']) + + gdbus = self.start_hanging_gdbus_claim() + + self._polkitd_obj.SimulateHangActions(['']) + self.device.Claim('(s)', 'testuser') + self.device.Release() + + self.assertIsNone(gdbus.poll()) + + self._polkitd_obj.ReleaseHangingCalls() + gdbus.wait() + + self.assertEqual(gdbus.returncode, 0) + + def test_hanging_claim_does_not_block_empty_list(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll', + 'net.reactivated.fprint.device.verify']) + + self._polkitd_obj.SimulateHangActions([ + 'net.reactivated.fprint.device.setusername']) + + gdbus = self.start_hanging_gdbus_claim() + + with self.assertFprintError('NoEnrolledPrints'): + self.device.ListEnrolledFingers('(s)', self.get_current_user()) + + self.assertIsNone(gdbus.poll()) + + def test_hanging_claim_does_not_block_verification(self): + self._polkitd_obj.SetAllowed([ + 'net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll', + 'net.reactivated.fprint.device.verify']) + + self.device.Claim('(s)', '') + self.enroll_image('whorl', finger='left-thumb') + self.device.Release() + + self._polkitd_obj.SimulateHangActions([ + 'net.reactivated.fprint.device.setusername']) + + gdbus = self.start_hanging_gdbus_claim() + + self.device.Claim('(s)', '') + self.device.VerifyStart('(s)', 'any') + self.send_image('whorl') + self.wait_for_result() + self.assertTrue(self._verify_stopped) + self.assertEqual(self._last_result, 'verify-match') + self.device.VerifyStop() + self.device.Release() + + self.assertIsNone(gdbus.poll()) + + class FPrintdUtilsTest(FPrintdVirtualDeviceBaseTest): @classmethod