diff --git a/tests/fprintd.py b/tests/fprintd.py index 00b36fc..616028a 100644 --- a/tests/fprintd.py +++ b/tests/fprintd.py @@ -132,9 +132,6 @@ ctx = GLib.main_context_default() class FPrintdTest(dbusmock.DBusTestCase): - socket_env = 'FP_VIRTUAL_IMAGE' - device_driver = 'virtual_image' - @staticmethod def path_from_service_file(sf): with open(SERVICE_FILE) as f: @@ -199,7 +196,7 @@ class FPrintdTest(dbusmock.DBusTestCase): dbusmock.DBusTestCase.tearDownClass() - def daemon_start(self, driver='Virtual image device'): + def daemon_start(self, driver='Virtual image device for debugging'): timeout = get_timeout('daemon_start') # seconds env = os.environ.copy() env['G_DEBUG'] = 'fatal-criticals' @@ -506,9 +503,12 @@ class FPrintdTest(dbusmock.DBusTestCase): raise GLib.GError(e.output) -class FPrintdVirtualDeviceBaseTest(FPrintdTest): +class FPrintdVirtualImageDeviceBaseTests(FPrintdTest): + socket_env = 'FP_VIRTUAL_IMAGE' + device_driver = 'virtual_image' + driver_name = 'Virtual image device for debugging' - driver_name = 'Virtual image device' +class FPrintdVirtualDeviceBaseTest(FPrintdVirtualImageDeviceBaseTests): def setUp(self): super().setUp() @@ -656,34 +656,58 @@ class FPrintdVirtualStorageDeviceBaseTest(FPrintdVirtualDeviceBaseTest): device_driver = 'virtual_device_storage' driver_name = 'Virtual device with storage and identification for debugging' - def send_command(self, command, *args): - self.assertIn(command, ['INSERT', 'REMOVE', 'SCAN', 'ERROR', 'LIST']) - - with Connection(self.sockaddr) as con: - params = ' '.join(str(p) for p in args) - con.sendall('{} {}'.format(command, params).encode('utf-8')) - res = [] - while True: - r = con.recv(1024) - if not r: - break - res.append(r) + def _send_command(self, con, command, *args): + params = ' '.join(str(p) for p in args) + con.sendall('{} {}'.format(command, params).encode('utf-8')) + res = [] + while True: + r = con.recv(1024) + if not r: + break + res.append(r) return b''.join(res) + def send_command(self, command, *args): + self.assertIn(command, ['INSERT', 'REMOVE', 'SCAN', 'ERROR', 'RETRY', + 'FINGER', 'UNPLUG', 'SLEEP', 'SET_ENROLL_STAGES', 'SET_SCAN_TYPE', + 'SET_CANCELLATION_ENABLED', 'LIST']) + + with Connection(self.sockaddr) as con: + res = self._send_command(con, command, *args) + + return res + + def send_image(self, image, con=None): + # This is meant to simulate the image scanning for image device + self.send_command('SCAN', image) + + def send_error(self, error=FPrint.DeviceError.GENERAL, con=None): + self.send_command('ERROR', int(error)) + + def send_retry(self, retry_error=FPrint.DeviceRetry.TOO_SHORT, con=None): + self.send_command('RETRY', int(retry_error)) + + def send_remove(self, con=None): + self.skipTest('Not implemented for {}'.format(self.device_driver)) + + def send_finger_automatic(self, automatic, con=None, iterate=True): + if not automatic: + return + self.skipTest('Not implemented for {}'.format(self.device_driver)) + + def send_finger_report(self, has_finger, con=None, iterate=True): + self.send_command('FINGER', 1 if has_finger else 0) + + while iterate and self.finger_present != has_finger: + ctx.iteration(False) + def enroll_print(self, nick, finger='right-index-finger', expected_result='enroll-completed'): - # Needs to assume success - self.send_command('SCAN', nick) + # Using the name of the image as the print id + super().enroll_image(img=nick, finger=finger, expected_result=expected_result) - self.device.EnrollStart('(s)', finger) - - # We only "scan" once, but multiple enroll stages are still signalled - stages = self.device.get_cached_property('num-enroll-stages').unpack() - self.wait_for_result(expected_result) - - self.device.EnrollStop() - self.assertEqual(self._last_result, expected_result) +class FPrintdVirtualStorageDeviceTests(FPrintdVirtualStorageDeviceBaseTest): def test_garbage_collect(self): self.device.Claim('(s)', 'testuser') @@ -768,6 +792,12 @@ class FPrintdVirtualStorageDeviceBaseTest(FPrintdVirtualDeviceBaseTest): self.device.Release() +class FPrintdVirtualNoStorageDeviceBaseTest(FPrintdVirtualStorageDeviceBaseTest): + + socket_env = 'FP_VIRTUAL_DEVICE' + device_driver = 'virtual_device' + driver_name = 'Virtual device for debugging' + class FPrintdManagerTests(FPrintdVirtualDeviceBaseTest): def setUp(self): @@ -783,7 +813,7 @@ class FPrintdManagerTests(FPrintdVirtualDeviceBaseTest): self.device.get_object_path()) -class FPrintdManagerPreStartTests(FPrintdTest): +class FPrintdManagerPreStartTests(FPrintdVirtualImageDeviceBaseTests): def test_manager_get_no_devices(self): os.environ['FP_DRIVERS_WHITELIST'] = 'hopefully_no_existing_driver' @@ -838,7 +868,7 @@ class FPrintdVirtualDeviceTest(FPrintdVirtualDeviceBaseTest): def test_name_property(self): self.assertEqual(self.device.get_cached_property('name').unpack(), - 'Virtual image device for debugging') + self.driver_name) def test_enroll_stages_property(self): self.assertEqual(self.device.get_cached_property('num-enroll-stages').unpack(), 5) @@ -1124,6 +1154,9 @@ class FPrintdVirtualDeviceTest(FPrintdVirtualDeviceBaseTest): if not self._has_hotplug: self.skipTest("libfprint is too old for hotplug") + if self.device_driver == 'virtual_device_storage': + self.skipTest('Not implemented for virtual_device_storage') + self._polkitd_obj.SetAllowed([FprintDevicePermission.set_username, FprintDevicePermission.enroll]) self.device.Claim('(s)', 'testuser') @@ -1153,6 +1186,12 @@ class FPrintdVirtualDeviceTest(FPrintdVirtualDeviceBaseTest): self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) +class FPrintdVirtualDeviceStorageTest(FPrintdVirtualStorageDeviceBaseTest, + FPrintdVirtualDeviceTest): + # Repeat the tests for the Virtual storage device + pass + + class FPrintdVirtualDeviceClaimedTest(FPrintdVirtualDeviceBaseTest): def setUp(self): @@ -1734,10 +1773,18 @@ class FPrintdVirtualDeviceClaimedTest(FPrintdVirtualDeviceBaseTest): self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) + accepted_exceptions = ['NoEnrolledPrints'] + if self.device_driver == 'virtual_device_storage': + accepted_exceptions.append('AlreadyInUse') + self.wait_for_device_reply_relaxed(expected_replies=2, - accepted_exceptions=['NoEnrolledPrints']) - self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], - self.get_all_async_replies()) + accepted_exceptions=accepted_exceptions) + + if self.device_driver == 'virtual_device_storage': + self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) + else: + self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], + self.get_all_async_replies()) def test_concourrent_delete_enrolled_fingers_unclaimed(self): self.enroll_image('whorl') @@ -1745,20 +1792,36 @@ class FPrintdVirtualDeviceClaimedTest(FPrintdVirtualDeviceBaseTest): self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) self.call_device_method_async('DeleteEnrolledFingers', '(s)', ['testuser']) + accepted_exceptions = ['NoEnrolledPrints'] + if self.device_driver == 'virtual_device_storage': + accepted_exceptions.append('AlreadyInUse') + self.wait_for_device_reply_relaxed(expected_replies=2, - accepted_exceptions=['NoEnrolledPrints']) - self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], - self.get_all_async_replies()) + accepted_exceptions=accepted_exceptions) + + if self.device_driver == 'virtual_device_storage': + self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) + else: + self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], + self.get_all_async_replies()) def test_concourrent_delete_enrolled_fingers2(self): self.enroll_image('whorl') self.call_device_method_async('DeleteEnrolledFingers2', '()', []) self.call_device_method_async('DeleteEnrolledFingers2', '()', []) + accepted_exceptions = ['NoEnrolledPrints'] + if self.device_driver == 'virtual_device_storage': + accepted_exceptions.append('AlreadyInUse') + self.wait_for_device_reply_relaxed(expected_replies=2, - accepted_exceptions=['NoEnrolledPrints']) - self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], - self.get_all_async_replies()) + accepted_exceptions=accepted_exceptions) + + if self.device_driver == 'virtual_device_storage': + self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) + else: + self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], + self.get_all_async_replies()) def test_concourrent_delete_enrolled_finger(self): self.enroll_image('whorl', finger='left-thumb') @@ -1766,10 +1829,18 @@ class FPrintdVirtualDeviceClaimedTest(FPrintdVirtualDeviceBaseTest): self.call_device_method_async('DeleteEnrolledFinger', '(s)', ['left-thumb']) self.call_device_method_async('DeleteEnrolledFinger', '(s)', ['right-thumb']) - # No failure is expected here since it's all sync - self.wait_for_device_reply(expected_replies=2) - self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], - self.get_all_async_replies()) + accepted_exceptions = [] + if self.device_driver == 'virtual_device_storage': + accepted_exceptions.append('AlreadyInUse') + + self.wait_for_device_reply_relaxed(expected_replies=2, + accepted_exceptions=accepted_exceptions) + + if self.device_driver == 'virtual_device_storage': + self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) + else: + self.assertEqual([GLib.Variant('()', ()), GLib.Variant('()', ())], + self.get_all_async_replies()) def test_concourrent_release(self): self.call_device_method_async('Release', '()', []) @@ -1886,6 +1957,11 @@ class FPrintdVirtualDeviceEnrollTests(FPrintdVirtualDeviceBaseTest): self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) +class FPrintdVirtualDeviceStorageClaimedTest(FPrintdVirtualStorageDeviceBaseTest, + FPrintdVirtualDeviceClaimedTest): + pass + # Repeat the tests for the Virtual storage device + class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): @classmethod @@ -1980,6 +2056,9 @@ class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): self.assertEqual(self._last_result, 'verify-match') def test_multiple_verify_cancelled(self): + if self.device_driver != 'virtual_image': + self.skipTest('Relies on virtual_image driver specifics') + with Connection(self.sockaddr) as con: self.send_finger_automatic(False, con=con) self.send_finger_report(True, con=con) @@ -2037,6 +2116,9 @@ class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): self.assertIn(GLib.Variant('()', ()), self.get_all_async_replies()) def test_verify_error_ignored_after_report(self): + if self.device_driver != 'virtual_image': + self.skipTest('Relies on virtual_image driver specifics') + with Connection(self.sockaddr) as con: self.send_finger_automatic(False, con=con) self.send_finger_report(True, con=con) @@ -2050,6 +2132,9 @@ class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): self.wait_for_result(max_wait=200, expected='verify-match') def test_verify_stop_restarts_immediately(self): + if self.device_driver != 'virtual_image': + self.skipTest('Relies on virtual_image driver specifics') + self.send_image('tented_arch') self.wait_for_result() self.assertTrue(self._verify_stopped) @@ -2061,6 +2146,9 @@ class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): self.wait_for_device_reply(expected_replies=2) def test_verify_stop_waits_for_completion(self): + if self.device_driver != 'virtual_image': + self.skipTest('Relies on virtual_image driver specifics') + self.stop_on_teardown = False with Connection(self.sockaddr) as con: @@ -2104,6 +2192,16 @@ class FPrintdVirtualDeviceVerificationTests(FPrintdVirtualDeviceBaseTest): self.assertTrue(self.get_async_replies(method='VerifyStop')) +class FPrintdVirtualDeviceStorageVerificationTests(FPrintdVirtualStorageDeviceBaseTest, + FPrintdVirtualDeviceVerificationTests): + # Repeat the tests for the Virtual storage device + pass + +class FPrintdVirtualDeviceNoStorageVerificationTests(FPrintdVirtualNoStorageDeviceBaseTest, + FPrintdVirtualDeviceVerificationTests): + # Repeat the tests for the Virtual device (with no storage) + pass + class FPrintdVirtualDeviceIdentificationTests(FPrintdVirtualDeviceVerificationTests): '''This class will just repeat the tests of FPrintdVirtualDeviceVerificationTests but with 'any' finger parameter (leading to an identification, when possible @@ -2116,6 +2214,16 @@ class FPrintdVirtualDeviceIdentificationTests(FPrintdVirtualDeviceVerificationTe cls.verify_finger = 'any' +class FPrintdVirtualDeviceStorageIdentificationTests(FPrintdVirtualStorageDeviceBaseTest, + FPrintdVirtualDeviceIdentificationTests): + # Repeat the tests for the Virtual storage device + pass + +class FPrintdVirtualDeviceNoStorageIdentificationTests(FPrintdVirtualNoStorageDeviceBaseTest, + FPrintdVirtualDeviceIdentificationTests): + # Repeat the tests for the Virtual device (with no storage) + pass + class FPrindConcurrentPolkitRequestsTest(FPrintdVirtualDeviceBaseTest): def wait_for_hanging_clients(self):