From 74838f9efc5409fe6102f42c730478b1902a12c6 Mon Sep 17 00:00:00 2001 From: Benjamin Berg Date: Mon, 7 Oct 2019 15:14:21 +0200 Subject: [PATCH] tests: Add basic integration test This test uses the virtual image driver included in libfprint for testing. --- Makefile.am | 2 +- configure.ac | 1 + tests/Makefile.am | 9 + tests/fprintd.py | 466 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 477 insertions(+), 1 deletion(-) create mode 100644 tests/Makefile.am create mode 100755 tests/fprintd.py diff --git a/Makefile.am b/Makefile.am index ebec8a2..9abc242 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,5 +1,5 @@ AUTOMAKE_OPTIONS = dist-bzip2 -SUBDIRS = src data utils pam doc po +SUBDIRS = src data utils pam doc tests po EXTRA_DIST = TODO intltool-extract.in intltool-merge.in intltool-update.in DISTCHECK_CONFIGURE_FLAGS = --enable-gtk-doc --with-systemdsystemunitdir='$${libdir}/systemd/system-distcheck' diff --git a/configure.ac b/configure.ac index 2b57a72..41cd6a2 100644 --- a/configure.ac +++ b/configure.ac @@ -86,5 +86,6 @@ pam/Makefile doc/Makefile doc/version.xml doc/dbus/Makefile +tests/Makefile po/Makefile.in ]) diff --git a/tests/Makefile.am b/tests/Makefile.am new file mode 100644 index 0000000..ca96c86 --- /dev/null +++ b/tests/Makefile.am @@ -0,0 +1,9 @@ +TESTS_ENVIRONMENT = export FPRINT_BUILD_DIR=$(abs_top_builddir)/src; export TOPSRCDIR=$(abs_top_srcdir); export PYTHON=@PYTHON@; +TESTS = fprintd.py + +EXTRA_DIST = \ + $(TESTS) \ + prints/README \ + prints/*.png \ + prints/*.jpg + diff --git a/tests/fprintd.py b/tests/fprintd.py new file mode 100755 index 0000000..6069c9d --- /dev/null +++ b/tests/fprintd.py @@ -0,0 +1,466 @@ +#! /usr/bin/env python3 +# Copyright © 2017, 2019 Red Hat, Inc +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# Authors: +# Christian J. Kellner +# Benjamin Berg + +import unittest +import time +import subprocess +import os +import os.path +import sys +import tempfile +import glob +import shutil +import socket +import struct +import dbusmock +import gi +from gi.repository import GLib, Gio +import cairo + +try: + from subprocess import DEVNULL +except ImportError: + DEVNULL = open(os.devnull, 'wb') + +SERVICE_FILE = '/usr/share/dbus-1/system-services/net.reactivated.Fprint.service' + +def get_timeout(topic='default'): + vals = { + 'valgrind': { + 'test': 300, + 'default': 20, + 'daemon_start': 60 + }, + 'default': { + 'test': 60, + 'default': 3, + 'daemon_start': 5 + } + } + + valgrind = os.getenv('VALGRIND') + lut = vals['valgrind' if valgrind is not None else 'default'] + if topic not in lut: + raise ValueError('invalid topic') + return lut[topic] + + +# Copied from libfprint tests +class Connection: + + def __init__(self, addr): + self.addr = addr + + def __enter__(self): + self.con = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.con.connect(self.addr) + return self.con + + def __exit__(self, exc_type, exc_val, exc_tb): + self.con.close() + del self.con + +def load_image(img): + png = cairo.ImageSurface.create_from_png(img) + + # Cairo wants 4 byte aligned rows, so just add a few pixel if necessary + w = png.get_width() + h = png.get_height() + w = (w + 3) // 4 * 4 + h = (h + 3) // 4 * 4 + img = cairo.ImageSurface(cairo.Format.A8, w, h) + cr = cairo.Context(img) + + cr.set_source_rgba(1, 1, 1, 1) + cr.paint() + + cr.set_source_rgba(0, 0, 0, 0) + cr.set_operator(cairo.OPERATOR_SOURCE) + + cr.set_source_surface(png) + cr.paint() + + return img + +if hasattr(os.environ, 'TOPSRCDIR'): + root = os.environ['TOPSRCDIR'] +else: + root = os.path.join(os.path.dirname(__file__), '..') + +imgdir = os.path.join(root, 'tests', 'prints') + +ctx = GLib.main_context_default() + +class FPrintdTest(dbusmock.DBusTestCase): + + @staticmethod + def path_from_service_file(sf): + with open(SERVICE_FILE) as f: + for line in f: + if not line.startswith('Exec='): + continue + return line.split('=', 1)[1].strip() + return None + + @classmethod + def setUpClass(cls): + fprintd = None + + if 'FPRINT_BUILD_DIR' in os.environ: + print('Testing local build') + build_dir = os.environ['FPRINT_BUILD_DIR'] + fprintd = os.path.join(build_dir, 'fprintd') + elif 'UNDER_JHBUILD' in os.environ: + print('Testing JHBuild version') + jhbuild_prefix = os.environ['JHBUILD_PREFIX'] + fprintd = os.path.join(jhbuild_prefix, 'libexec', 'fprintd') + else: + print('Testing installed system binaries') + fprintd = cls.path_from_service_file(SERVICE_FILE) + + assert fprintd is not None, 'failed to find daemon' + cls.paths = {'daemon': fprintd } + + + cls.tmpdir = tempfile.mkdtemp(prefix='libfprint-') + + cls.sockaddr = os.path.join(cls.tmpdir, 'virtual-image.socket') + os.environ['FP_VIRTUAL_IMAGE'] = cls.sockaddr + + cls.prints = {} + for f in glob.glob(os.path.join(imgdir, '*.png')): + n = os.path.basename(f)[:-4] + cls.prints[n] = load_image(f) + + + cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE) + cls.test_bus.up() + try: + del os.environ['DBUS_SESSION_BUS_ADDRESS'] + except KeyError: + pass + os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address() + cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) + + @classmethod + def tearDownClass(cls): + cls.test_bus.down() + shutil.rmtree(cls.tmpdir) + dbusmock.DBusTestCase.tearDownClass() + + + def daemon_start(self): + timeout = get_timeout('daemon_start') # seconds + env = os.environ.copy() + env['G_DEBUG'] = 'fatal-criticals' + env['STATE_DIRECTORY'] = self.state_dir + env['RUNTIME_DIRECTORY'] = self.run_dir + + argv = [self.paths['daemon'], '-t'] + valgrind = os.getenv('VALGRIND') + if valgrind is not None: + argv.insert(0, 'valgrind') + argv.insert(1, '--leak-check=full') + if os.path.exists(valgrind): + argv.insert(2, '--suppressions=%s' % valgrind) + self.valgrind = True + self.daemon = subprocess.Popen(argv, + env=env, + stdout=None, + stderr=subprocess.STDOUT) + self.device = None + + timeout_count = timeout * 10 + timeout_sleep = 0.1 + while timeout_count > 0: + time.sleep(timeout_sleep) + timeout_count -= 1 + try: + self.manager = Gio.DBusProxy.new_sync(self.dbus, + Gio.DBusProxyFlags.DO_NOT_AUTO_START, + None, + 'net.reactivated.Fprint', + '/net/reactivated/Fprint/Manager', + 'net.reactivated.Fprint.Manager', + None) + + devices = self.manager.GetDevices() + # Find the virtual device, just in case it is a local run + # and there is another usable sensor available locally + for path in devices: + dev = Gio.DBusProxy.new_sync(self.dbus, + Gio.DBusProxyFlags.DO_NOT_AUTO_START, + None, + 'net.reactivated.Fprint', + path, + 'net.reactivated.Fprint.Device', + None) + + if 'Virtual image device' in str(dev.get_cached_property('name')): + self.device = dev + break + else: + print('Did not find virtual device! Probably libfprint was build without the corresponding driver!') + + break + except GLib.GError: + pass + else: + timeout_time = timeout * 10 * timeout_sleep + self.fail('daemon did not start in %d seconds' % timeout_time) + + def daemon_stop(self): + + if self.daemon: + try: + self.daemon.terminate() + except OSError: + pass + self.daemon.wait() + + self.daemon = None + self.client = None + + def polkitd_start(self): + self._polkitd, self._polkitd_obj = self.spawn_server_template( + 'polkitd', {}, stdout=DEVNULL) + + def polkitd_stop(self): + if self._polkitd is None: + return + self._polkitd.terminate() + self._polkitd.wait() + + + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.state_dir = os.path.join(self.test_dir, 'state') + self.run_dir = os.path.join(self.test_dir, 'run') + + def tearDown(self): + shutil.rmtree(self.test_dir) + + # From libfprint tests + def send_retry(self, retry_error=1): + # The default (1) is too-short + with Connection(self.sockaddr) as con: + con.sendall(struct.pack('ii', -1, retry_error)) + + # From libfprint tests + def send_image(self, image): + img = self.prints[image] + with Connection(self.sockaddr) as con: + mem = img.get_data() + mem = mem.tobytes() + assert len(mem) == img.get_width() * img.get_height() + + encoded_img = struct.pack('ii', img.get_width(), img.get_height()) + encoded_img += mem + + con.sendall(encoded_img) + + def test_enroll_verify_delete(self): + self.polkitd_start() + self.daemon_start() + + if self.device is None: + self.daemon_stop() + self.polkitd_stop() + self.skipTest("Need virtual_image device to run the test") + + def timeout_cb(*args): + # Note: With meson we could just rely on it to kill us + print("Test timed out, hard exiting") + sys.exit(1) + + timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb) + + self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll', + 'net.reactivated.fprint.device.verify']) + + def signal_cb(proxy, sender, signal, params): + print(signal, params) + if signal == 'EnrollStatus': + self._abort = params[1] + self._last_result = params[0] + + if not self._abort and self._last_result == 'enroll-stage-passed': + self.send_image('whorl') + elif self._abort: + pass + else: + self._abort = True + self._last_result = 'Unexpected signal values' + print('Unexpected signal values') + elif signal == 'VerifyFingerSelected': + pass + elif signal == 'VerifyStatus': + self._abort = True + self._last_result = params[0] + self._verify_stopped = params[1] + else: + self._abort = True + self._last_result = 'Unexpected signal' + + signal_id = self.device.connect('g-signal', signal_cb) + + self.device.Claim('(s)', 'testuser') + + self.device.EnrollStart('(s)', 'right-index-finger') + + self.send_image('whorl') + + self._abort = False + while not self._abort: + ctx.iteration(True) + + assert self._last_result == 'enroll-completed' + + self.device.EnrollStop() + + assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7')) + + # Finger is enrolled, try to verify it + self.device.VerifyStart('(s)', 'any') + + # Try a wrong print; will stop verification + self.send_image('tented_arch') + self._abort = False + while not self._abort: + ctx.iteration(True) + assert self._verify_stopped == True + assert self._last_result == 'verify-no-match' + + self.device.VerifyStop() + self.device.VerifyStart('(s)', 'any') + + # Send a retry error (swipe too short); will not stop verification + self.send_retry() + self._abort = False + while not self._abort: + ctx.iteration(True) + assert self._verify_stopped == False + assert self._last_result == 'verify-swipe-too-short' + + # Try the correct print; will stop verification + self.send_image('whorl') + self._abort = False + while not self._abort: + ctx.iteration(True) + assert self._verify_stopped == True + assert self._last_result == 'verify-match' + + + # And delete the print(s) again + self.device.DeleteEnrolledFingers('(s)', 'testuser') + + assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7')) + + GLib.source_remove(timeout) + + self.device.disconnect(signal_id) + + self.device.Release() + self.daemon_stop() + self.polkitd_stop() + + def test_enroll_delete2(self): + self.polkitd_start() + self.daemon_start() + + if self.device is None: + self.daemon_stop() + self.polkitd_stop() + self.skipTest("Need virtual_image device to run the test") + + def timeout_cb(*args): + # Note: With meson we could just rely on it to kill us + print("Test timed out, hard exiting") + sys.exit(1) + + timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb) + + self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername', + 'net.reactivated.fprint.device.enroll', + 'net.reactivated.fprint.device.verify']) + + def signal_cb(proxy, sender, signal, params): + print(signal, params) + if signal == 'EnrollStatus': + self._abort = params[1] + self._last_result = params[0] + + if not self._abort and self._last_result == 'enroll-stage-passed': + self.send_image('whorl') + elif self._abort: + pass + else: + self._abort = True + self._last_result = 'Unexpected signal values' + print('Unexpected signal values') + elif signal == 'VerifyFingerSelected': + pass + elif signal == 'VerifyStatus': + self._abort = True + self._last_result = params[0] + self._verify_stopped = params[1] + else: + self._abort = True + self._last_result = 'Unexpected signal' + + signal_id = self.device.connect('g-signal', signal_cb) + + self.device.Claim('(s)', 'testuser') + + self.device.EnrollStart('(s)', 'right-index-finger') + + self.send_image('whorl') + + self._abort = False + while not self._abort: + ctx.iteration(True) + + assert self._last_result == 'enroll-completed' + + self.device.EnrollStop() + + assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7')) + + # And delete the print(s) again using the new API + self.device.DeleteEnrolledFingers2() + + assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7')) + + GLib.source_remove(timeout) + + self.device.disconnect(signal_id) + + self.device.Release() + self.daemon_stop() + self.polkitd_stop() + +if __name__ == '__main__': + if len(sys.argv) == 2 and sys.argv[1] == "list-tests": + for machine, human in list_tests(): + print("%s %s" % (machine, human), end="\n") + sys.exit(0) + + unittest.main(verbosity=2)