diff --git a/Makefile.am b/Makefile.am
index ebec8a2..9abc242 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1,5 +1,5 @@
AUTOMAKE_OPTIONS = dist-bzip2
-SUBDIRS = src data utils pam doc po
+SUBDIRS = src data utils pam doc tests po
EXTRA_DIST = TODO intltool-extract.in intltool-merge.in intltool-update.in
DISTCHECK_CONFIGURE_FLAGS = --enable-gtk-doc --with-systemdsystemunitdir='$${libdir}/systemd/system-distcheck'
diff --git a/configure.ac b/configure.ac
index 2b57a72..41cd6a2 100644
--- a/configure.ac
+++ b/configure.ac
@@ -86,5 +86,6 @@ pam/Makefile
doc/Makefile
doc/version.xml
doc/dbus/Makefile
+tests/Makefile
po/Makefile.in
])
diff --git a/tests/Makefile.am b/tests/Makefile.am
new file mode 100644
index 0000000..ca96c86
--- /dev/null
+++ b/tests/Makefile.am
@@ -0,0 +1,9 @@
+TESTS_ENVIRONMENT = export FPRINT_BUILD_DIR=$(abs_top_builddir)/src; export TOPSRCDIR=$(abs_top_srcdir); export PYTHON=@PYTHON@;
+TESTS = fprintd.py
+
+EXTRA_DIST = \
+ $(TESTS) \
+ prints/README \
+ prints/*.png \
+ prints/*.jpg
+
diff --git a/tests/fprintd.py b/tests/fprintd.py
new file mode 100755
index 0000000..6069c9d
--- /dev/null
+++ b/tests/fprintd.py
@@ -0,0 +1,466 @@
+#! /usr/bin/env python3
+# Copyright © 2017, 2019 Red Hat, Inc
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see .
+# Authors:
+# Christian J. Kellner
+# Benjamin Berg
+
+import unittest
+import time
+import subprocess
+import os
+import os.path
+import sys
+import tempfile
+import glob
+import shutil
+import socket
+import struct
+import dbusmock
+import gi
+from gi.repository import GLib, Gio
+import cairo
+
+try:
+ from subprocess import DEVNULL
+except ImportError:
+ DEVNULL = open(os.devnull, 'wb')
+
+SERVICE_FILE = '/usr/share/dbus-1/system-services/net.reactivated.Fprint.service'
+
+def get_timeout(topic='default'):
+ vals = {
+ 'valgrind': {
+ 'test': 300,
+ 'default': 20,
+ 'daemon_start': 60
+ },
+ 'default': {
+ 'test': 60,
+ 'default': 3,
+ 'daemon_start': 5
+ }
+ }
+
+ valgrind = os.getenv('VALGRIND')
+ lut = vals['valgrind' if valgrind is not None else 'default']
+ if topic not in lut:
+ raise ValueError('invalid topic')
+ return lut[topic]
+
+
+# Copied from libfprint tests
+class Connection:
+
+ def __init__(self, addr):
+ self.addr = addr
+
+ def __enter__(self):
+ self.con = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.con.connect(self.addr)
+ return self.con
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.con.close()
+ del self.con
+
+def load_image(img):
+ png = cairo.ImageSurface.create_from_png(img)
+
+ # Cairo wants 4 byte aligned rows, so just add a few pixel if necessary
+ w = png.get_width()
+ h = png.get_height()
+ w = (w + 3) // 4 * 4
+ h = (h + 3) // 4 * 4
+ img = cairo.ImageSurface(cairo.Format.A8, w, h)
+ cr = cairo.Context(img)
+
+ cr.set_source_rgba(1, 1, 1, 1)
+ cr.paint()
+
+ cr.set_source_rgba(0, 0, 0, 0)
+ cr.set_operator(cairo.OPERATOR_SOURCE)
+
+ cr.set_source_surface(png)
+ cr.paint()
+
+ return img
+
+if hasattr(os.environ, 'TOPSRCDIR'):
+ root = os.environ['TOPSRCDIR']
+else:
+ root = os.path.join(os.path.dirname(__file__), '..')
+
+imgdir = os.path.join(root, 'tests', 'prints')
+
+ctx = GLib.main_context_default()
+
+class FPrintdTest(dbusmock.DBusTestCase):
+
+ @staticmethod
+ def path_from_service_file(sf):
+ with open(SERVICE_FILE) as f:
+ for line in f:
+ if not line.startswith('Exec='):
+ continue
+ return line.split('=', 1)[1].strip()
+ return None
+
+ @classmethod
+ def setUpClass(cls):
+ fprintd = None
+
+ if 'FPRINT_BUILD_DIR' in os.environ:
+ print('Testing local build')
+ build_dir = os.environ['FPRINT_BUILD_DIR']
+ fprintd = os.path.join(build_dir, 'fprintd')
+ elif 'UNDER_JHBUILD' in os.environ:
+ print('Testing JHBuild version')
+ jhbuild_prefix = os.environ['JHBUILD_PREFIX']
+ fprintd = os.path.join(jhbuild_prefix, 'libexec', 'fprintd')
+ else:
+ print('Testing installed system binaries')
+ fprintd = cls.path_from_service_file(SERVICE_FILE)
+
+ assert fprintd is not None, 'failed to find daemon'
+ cls.paths = {'daemon': fprintd }
+
+
+ cls.tmpdir = tempfile.mkdtemp(prefix='libfprint-')
+
+ cls.sockaddr = os.path.join(cls.tmpdir, 'virtual-image.socket')
+ os.environ['FP_VIRTUAL_IMAGE'] = cls.sockaddr
+
+ cls.prints = {}
+ for f in glob.glob(os.path.join(imgdir, '*.png')):
+ n = os.path.basename(f)[:-4]
+ cls.prints[n] = load_image(f)
+
+
+ cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE)
+ cls.test_bus.up()
+ try:
+ del os.environ['DBUS_SESSION_BUS_ADDRESS']
+ except KeyError:
+ pass
+ os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address()
+ cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.test_bus.down()
+ shutil.rmtree(cls.tmpdir)
+ dbusmock.DBusTestCase.tearDownClass()
+
+
+ def daemon_start(self):
+ timeout = get_timeout('daemon_start') # seconds
+ env = os.environ.copy()
+ env['G_DEBUG'] = 'fatal-criticals'
+ env['STATE_DIRECTORY'] = self.state_dir
+ env['RUNTIME_DIRECTORY'] = self.run_dir
+
+ argv = [self.paths['daemon'], '-t']
+ valgrind = os.getenv('VALGRIND')
+ if valgrind is not None:
+ argv.insert(0, 'valgrind')
+ argv.insert(1, '--leak-check=full')
+ if os.path.exists(valgrind):
+ argv.insert(2, '--suppressions=%s' % valgrind)
+ self.valgrind = True
+ self.daemon = subprocess.Popen(argv,
+ env=env,
+ stdout=None,
+ stderr=subprocess.STDOUT)
+ self.device = None
+
+ timeout_count = timeout * 10
+ timeout_sleep = 0.1
+ while timeout_count > 0:
+ time.sleep(timeout_sleep)
+ timeout_count -= 1
+ try:
+ self.manager = Gio.DBusProxy.new_sync(self.dbus,
+ Gio.DBusProxyFlags.DO_NOT_AUTO_START,
+ None,
+ 'net.reactivated.Fprint',
+ '/net/reactivated/Fprint/Manager',
+ 'net.reactivated.Fprint.Manager',
+ None)
+
+ devices = self.manager.GetDevices()
+ # Find the virtual device, just in case it is a local run
+ # and there is another usable sensor available locally
+ for path in devices:
+ dev = Gio.DBusProxy.new_sync(self.dbus,
+ Gio.DBusProxyFlags.DO_NOT_AUTO_START,
+ None,
+ 'net.reactivated.Fprint',
+ path,
+ 'net.reactivated.Fprint.Device',
+ None)
+
+ if 'Virtual image device' in str(dev.get_cached_property('name')):
+ self.device = dev
+ break
+ else:
+ print('Did not find virtual device! Probably libfprint was build without the corresponding driver!')
+
+ break
+ except GLib.GError:
+ pass
+ else:
+ timeout_time = timeout * 10 * timeout_sleep
+ self.fail('daemon did not start in %d seconds' % timeout_time)
+
+ def daemon_stop(self):
+
+ if self.daemon:
+ try:
+ self.daemon.terminate()
+ except OSError:
+ pass
+ self.daemon.wait()
+
+ self.daemon = None
+ self.client = None
+
+ def polkitd_start(self):
+ self._polkitd, self._polkitd_obj = self.spawn_server_template(
+ 'polkitd', {}, stdout=DEVNULL)
+
+ def polkitd_stop(self):
+ if self._polkitd is None:
+ return
+ self._polkitd.terminate()
+ self._polkitd.wait()
+
+
+
+ def setUp(self):
+ self.test_dir = tempfile.mkdtemp()
+ self.state_dir = os.path.join(self.test_dir, 'state')
+ self.run_dir = os.path.join(self.test_dir, 'run')
+
+ def tearDown(self):
+ shutil.rmtree(self.test_dir)
+
+ # From libfprint tests
+ def send_retry(self, retry_error=1):
+ # The default (1) is too-short
+ with Connection(self.sockaddr) as con:
+ con.sendall(struct.pack('ii', -1, retry_error))
+
+ # From libfprint tests
+ def send_image(self, image):
+ img = self.prints[image]
+ with Connection(self.sockaddr) as con:
+ mem = img.get_data()
+ mem = mem.tobytes()
+ assert len(mem) == img.get_width() * img.get_height()
+
+ encoded_img = struct.pack('ii', img.get_width(), img.get_height())
+ encoded_img += mem
+
+ con.sendall(encoded_img)
+
+ def test_enroll_verify_delete(self):
+ self.polkitd_start()
+ self.daemon_start()
+
+ if self.device is None:
+ self.daemon_stop()
+ self.polkitd_stop()
+ self.skipTest("Need virtual_image device to run the test")
+
+ def timeout_cb(*args):
+ # Note: With meson we could just rely on it to kill us
+ print("Test timed out, hard exiting")
+ sys.exit(1)
+
+ timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb)
+
+ self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername',
+ 'net.reactivated.fprint.device.enroll',
+ 'net.reactivated.fprint.device.verify'])
+
+ def signal_cb(proxy, sender, signal, params):
+ print(signal, params)
+ if signal == 'EnrollStatus':
+ self._abort = params[1]
+ self._last_result = params[0]
+
+ if not self._abort and self._last_result == 'enroll-stage-passed':
+ self.send_image('whorl')
+ elif self._abort:
+ pass
+ else:
+ self._abort = True
+ self._last_result = 'Unexpected signal values'
+ print('Unexpected signal values')
+ elif signal == 'VerifyFingerSelected':
+ pass
+ elif signal == 'VerifyStatus':
+ self._abort = True
+ self._last_result = params[0]
+ self._verify_stopped = params[1]
+ else:
+ self._abort = True
+ self._last_result = 'Unexpected signal'
+
+ signal_id = self.device.connect('g-signal', signal_cb)
+
+ self.device.Claim('(s)', 'testuser')
+
+ self.device.EnrollStart('(s)', 'right-index-finger')
+
+ self.send_image('whorl')
+
+ self._abort = False
+ while not self._abort:
+ ctx.iteration(True)
+
+ assert self._last_result == 'enroll-completed'
+
+ self.device.EnrollStop()
+
+ assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
+
+ # Finger is enrolled, try to verify it
+ self.device.VerifyStart('(s)', 'any')
+
+ # Try a wrong print; will stop verification
+ self.send_image('tented_arch')
+ self._abort = False
+ while not self._abort:
+ ctx.iteration(True)
+ assert self._verify_stopped == True
+ assert self._last_result == 'verify-no-match'
+
+ self.device.VerifyStop()
+ self.device.VerifyStart('(s)', 'any')
+
+ # Send a retry error (swipe too short); will not stop verification
+ self.send_retry()
+ self._abort = False
+ while not self._abort:
+ ctx.iteration(True)
+ assert self._verify_stopped == False
+ assert self._last_result == 'verify-swipe-too-short'
+
+ # Try the correct print; will stop verification
+ self.send_image('whorl')
+ self._abort = False
+ while not self._abort:
+ ctx.iteration(True)
+ assert self._verify_stopped == True
+ assert self._last_result == 'verify-match'
+
+
+ # And delete the print(s) again
+ self.device.DeleteEnrolledFingers('(s)', 'testuser')
+
+ assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
+
+ GLib.source_remove(timeout)
+
+ self.device.disconnect(signal_id)
+
+ self.device.Release()
+ self.daemon_stop()
+ self.polkitd_stop()
+
+ def test_enroll_delete2(self):
+ self.polkitd_start()
+ self.daemon_start()
+
+ if self.device is None:
+ self.daemon_stop()
+ self.polkitd_stop()
+ self.skipTest("Need virtual_image device to run the test")
+
+ def timeout_cb(*args):
+ # Note: With meson we could just rely on it to kill us
+ print("Test timed out, hard exiting")
+ sys.exit(1)
+
+ timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb)
+
+ self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername',
+ 'net.reactivated.fprint.device.enroll',
+ 'net.reactivated.fprint.device.verify'])
+
+ def signal_cb(proxy, sender, signal, params):
+ print(signal, params)
+ if signal == 'EnrollStatus':
+ self._abort = params[1]
+ self._last_result = params[0]
+
+ if not self._abort and self._last_result == 'enroll-stage-passed':
+ self.send_image('whorl')
+ elif self._abort:
+ pass
+ else:
+ self._abort = True
+ self._last_result = 'Unexpected signal values'
+ print('Unexpected signal values')
+ elif signal == 'VerifyFingerSelected':
+ pass
+ elif signal == 'VerifyStatus':
+ self._abort = True
+ self._last_result = params[0]
+ self._verify_stopped = params[1]
+ else:
+ self._abort = True
+ self._last_result = 'Unexpected signal'
+
+ signal_id = self.device.connect('g-signal', signal_cb)
+
+ self.device.Claim('(s)', 'testuser')
+
+ self.device.EnrollStart('(s)', 'right-index-finger')
+
+ self.send_image('whorl')
+
+ self._abort = False
+ while not self._abort:
+ ctx.iteration(True)
+
+ assert self._last_result == 'enroll-completed'
+
+ self.device.EnrollStop()
+
+ assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
+
+ # And delete the print(s) again using the new API
+ self.device.DeleteEnrolledFingers2()
+
+ assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
+
+ GLib.source_remove(timeout)
+
+ self.device.disconnect(signal_id)
+
+ self.device.Release()
+ self.daemon_stop()
+ self.polkitd_stop()
+
+if __name__ == '__main__':
+ if len(sys.argv) == 2 and sys.argv[1] == "list-tests":
+ for machine, human in list_tests():
+ print("%s %s" % (machine, human), end="\n")
+ sys.exit(0)
+
+ unittest.main(verbosity=2)