mirror of
https://gitlab.com/mishakmak/pam-fprint-grosshack.git
synced 2026-04-08 20:03:34 +02:00
tests: Add basic integration test
This test uses the virtual image driver included in libfprint for testing.
This commit is contained in:
9
tests/Makefile.am
Normal file
9
tests/Makefile.am
Normal file
@ -0,0 +1,9 @@
|
||||
TESTS_ENVIRONMENT = export FPRINT_BUILD_DIR=$(abs_top_builddir)/src; export TOPSRCDIR=$(abs_top_srcdir); export PYTHON=@PYTHON@;
|
||||
TESTS = fprintd.py
|
||||
|
||||
EXTRA_DIST = \
|
||||
$(TESTS) \
|
||||
prints/README \
|
||||
prints/*.png \
|
||||
prints/*.jpg
|
||||
|
||||
466
tests/fprintd.py
Executable file
466
tests/fprintd.py
Executable file
@ -0,0 +1,466 @@
|
||||
#! /usr/bin/env python3
|
||||
# Copyright © 2017, 2019 Red Hat, Inc
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation; either
|
||||
# version 2.1 of the License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public
|
||||
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
||||
# Authors:
|
||||
# Christian J. Kellner <christian@kellner.me>
|
||||
# Benjamin Berg <bberg@redhat.com>
|
||||
|
||||
import unittest
|
||||
import time
|
||||
import subprocess
|
||||
import os
|
||||
import os.path
|
||||
import sys
|
||||
import tempfile
|
||||
import glob
|
||||
import shutil
|
||||
import socket
|
||||
import struct
|
||||
import dbusmock
|
||||
import gi
|
||||
from gi.repository import GLib, Gio
|
||||
import cairo
|
||||
|
||||
try:
|
||||
from subprocess import DEVNULL
|
||||
except ImportError:
|
||||
DEVNULL = open(os.devnull, 'wb')
|
||||
|
||||
SERVICE_FILE = '/usr/share/dbus-1/system-services/net.reactivated.Fprint.service'
|
||||
|
||||
def get_timeout(topic='default'):
|
||||
vals = {
|
||||
'valgrind': {
|
||||
'test': 300,
|
||||
'default': 20,
|
||||
'daemon_start': 60
|
||||
},
|
||||
'default': {
|
||||
'test': 60,
|
||||
'default': 3,
|
||||
'daemon_start': 5
|
||||
}
|
||||
}
|
||||
|
||||
valgrind = os.getenv('VALGRIND')
|
||||
lut = vals['valgrind' if valgrind is not None else 'default']
|
||||
if topic not in lut:
|
||||
raise ValueError('invalid topic')
|
||||
return lut[topic]
|
||||
|
||||
|
||||
# Copied from libfprint tests
|
||||
class Connection:
|
||||
|
||||
def __init__(self, addr):
|
||||
self.addr = addr
|
||||
|
||||
def __enter__(self):
|
||||
self.con = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.con.connect(self.addr)
|
||||
return self.con
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.con.close()
|
||||
del self.con
|
||||
|
||||
def load_image(img):
|
||||
png = cairo.ImageSurface.create_from_png(img)
|
||||
|
||||
# Cairo wants 4 byte aligned rows, so just add a few pixel if necessary
|
||||
w = png.get_width()
|
||||
h = png.get_height()
|
||||
w = (w + 3) // 4 * 4
|
||||
h = (h + 3) // 4 * 4
|
||||
img = cairo.ImageSurface(cairo.Format.A8, w, h)
|
||||
cr = cairo.Context(img)
|
||||
|
||||
cr.set_source_rgba(1, 1, 1, 1)
|
||||
cr.paint()
|
||||
|
||||
cr.set_source_rgba(0, 0, 0, 0)
|
||||
cr.set_operator(cairo.OPERATOR_SOURCE)
|
||||
|
||||
cr.set_source_surface(png)
|
||||
cr.paint()
|
||||
|
||||
return img
|
||||
|
||||
if hasattr(os.environ, 'TOPSRCDIR'):
|
||||
root = os.environ['TOPSRCDIR']
|
||||
else:
|
||||
root = os.path.join(os.path.dirname(__file__), '..')
|
||||
|
||||
imgdir = os.path.join(root, 'tests', 'prints')
|
||||
|
||||
ctx = GLib.main_context_default()
|
||||
|
||||
class FPrintdTest(dbusmock.DBusTestCase):
|
||||
|
||||
@staticmethod
|
||||
def path_from_service_file(sf):
|
||||
with open(SERVICE_FILE) as f:
|
||||
for line in f:
|
||||
if not line.startswith('Exec='):
|
||||
continue
|
||||
return line.split('=', 1)[1].strip()
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
fprintd = None
|
||||
|
||||
if 'FPRINT_BUILD_DIR' in os.environ:
|
||||
print('Testing local build')
|
||||
build_dir = os.environ['FPRINT_BUILD_DIR']
|
||||
fprintd = os.path.join(build_dir, 'fprintd')
|
||||
elif 'UNDER_JHBUILD' in os.environ:
|
||||
print('Testing JHBuild version')
|
||||
jhbuild_prefix = os.environ['JHBUILD_PREFIX']
|
||||
fprintd = os.path.join(jhbuild_prefix, 'libexec', 'fprintd')
|
||||
else:
|
||||
print('Testing installed system binaries')
|
||||
fprintd = cls.path_from_service_file(SERVICE_FILE)
|
||||
|
||||
assert fprintd is not None, 'failed to find daemon'
|
||||
cls.paths = {'daemon': fprintd }
|
||||
|
||||
|
||||
cls.tmpdir = tempfile.mkdtemp(prefix='libfprint-')
|
||||
|
||||
cls.sockaddr = os.path.join(cls.tmpdir, 'virtual-image.socket')
|
||||
os.environ['FP_VIRTUAL_IMAGE'] = cls.sockaddr
|
||||
|
||||
cls.prints = {}
|
||||
for f in glob.glob(os.path.join(imgdir, '*.png')):
|
||||
n = os.path.basename(f)[:-4]
|
||||
cls.prints[n] = load_image(f)
|
||||
|
||||
|
||||
cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE)
|
||||
cls.test_bus.up()
|
||||
try:
|
||||
del os.environ['DBUS_SESSION_BUS_ADDRESS']
|
||||
except KeyError:
|
||||
pass
|
||||
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address()
|
||||
cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.test_bus.down()
|
||||
shutil.rmtree(cls.tmpdir)
|
||||
dbusmock.DBusTestCase.tearDownClass()
|
||||
|
||||
|
||||
def daemon_start(self):
|
||||
timeout = get_timeout('daemon_start') # seconds
|
||||
env = os.environ.copy()
|
||||
env['G_DEBUG'] = 'fatal-criticals'
|
||||
env['STATE_DIRECTORY'] = self.state_dir
|
||||
env['RUNTIME_DIRECTORY'] = self.run_dir
|
||||
|
||||
argv = [self.paths['daemon'], '-t']
|
||||
valgrind = os.getenv('VALGRIND')
|
||||
if valgrind is not None:
|
||||
argv.insert(0, 'valgrind')
|
||||
argv.insert(1, '--leak-check=full')
|
||||
if os.path.exists(valgrind):
|
||||
argv.insert(2, '--suppressions=%s' % valgrind)
|
||||
self.valgrind = True
|
||||
self.daemon = subprocess.Popen(argv,
|
||||
env=env,
|
||||
stdout=None,
|
||||
stderr=subprocess.STDOUT)
|
||||
self.device = None
|
||||
|
||||
timeout_count = timeout * 10
|
||||
timeout_sleep = 0.1
|
||||
while timeout_count > 0:
|
||||
time.sleep(timeout_sleep)
|
||||
timeout_count -= 1
|
||||
try:
|
||||
self.manager = Gio.DBusProxy.new_sync(self.dbus,
|
||||
Gio.DBusProxyFlags.DO_NOT_AUTO_START,
|
||||
None,
|
||||
'net.reactivated.Fprint',
|
||||
'/net/reactivated/Fprint/Manager',
|
||||
'net.reactivated.Fprint.Manager',
|
||||
None)
|
||||
|
||||
devices = self.manager.GetDevices()
|
||||
# Find the virtual device, just in case it is a local run
|
||||
# and there is another usable sensor available locally
|
||||
for path in devices:
|
||||
dev = Gio.DBusProxy.new_sync(self.dbus,
|
||||
Gio.DBusProxyFlags.DO_NOT_AUTO_START,
|
||||
None,
|
||||
'net.reactivated.Fprint',
|
||||
path,
|
||||
'net.reactivated.Fprint.Device',
|
||||
None)
|
||||
|
||||
if 'Virtual image device' in str(dev.get_cached_property('name')):
|
||||
self.device = dev
|
||||
break
|
||||
else:
|
||||
print('Did not find virtual device! Probably libfprint was build without the corresponding driver!')
|
||||
|
||||
break
|
||||
except GLib.GError:
|
||||
pass
|
||||
else:
|
||||
timeout_time = timeout * 10 * timeout_sleep
|
||||
self.fail('daemon did not start in %d seconds' % timeout_time)
|
||||
|
||||
def daemon_stop(self):
|
||||
|
||||
if self.daemon:
|
||||
try:
|
||||
self.daemon.terminate()
|
||||
except OSError:
|
||||
pass
|
||||
self.daemon.wait()
|
||||
|
||||
self.daemon = None
|
||||
self.client = None
|
||||
|
||||
def polkitd_start(self):
|
||||
self._polkitd, self._polkitd_obj = self.spawn_server_template(
|
||||
'polkitd', {}, stdout=DEVNULL)
|
||||
|
||||
def polkitd_stop(self):
|
||||
if self._polkitd is None:
|
||||
return
|
||||
self._polkitd.terminate()
|
||||
self._polkitd.wait()
|
||||
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
self.state_dir = os.path.join(self.test_dir, 'state')
|
||||
self.run_dir = os.path.join(self.test_dir, 'run')
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.test_dir)
|
||||
|
||||
# From libfprint tests
|
||||
def send_retry(self, retry_error=1):
|
||||
# The default (1) is too-short
|
||||
with Connection(self.sockaddr) as con:
|
||||
con.sendall(struct.pack('ii', -1, retry_error))
|
||||
|
||||
# From libfprint tests
|
||||
def send_image(self, image):
|
||||
img = self.prints[image]
|
||||
with Connection(self.sockaddr) as con:
|
||||
mem = img.get_data()
|
||||
mem = mem.tobytes()
|
||||
assert len(mem) == img.get_width() * img.get_height()
|
||||
|
||||
encoded_img = struct.pack('ii', img.get_width(), img.get_height())
|
||||
encoded_img += mem
|
||||
|
||||
con.sendall(encoded_img)
|
||||
|
||||
def test_enroll_verify_delete(self):
|
||||
self.polkitd_start()
|
||||
self.daemon_start()
|
||||
|
||||
if self.device is None:
|
||||
self.daemon_stop()
|
||||
self.polkitd_stop()
|
||||
self.skipTest("Need virtual_image device to run the test")
|
||||
|
||||
def timeout_cb(*args):
|
||||
# Note: With meson we could just rely on it to kill us
|
||||
print("Test timed out, hard exiting")
|
||||
sys.exit(1)
|
||||
|
||||
timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb)
|
||||
|
||||
self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername',
|
||||
'net.reactivated.fprint.device.enroll',
|
||||
'net.reactivated.fprint.device.verify'])
|
||||
|
||||
def signal_cb(proxy, sender, signal, params):
|
||||
print(signal, params)
|
||||
if signal == 'EnrollStatus':
|
||||
self._abort = params[1]
|
||||
self._last_result = params[0]
|
||||
|
||||
if not self._abort and self._last_result == 'enroll-stage-passed':
|
||||
self.send_image('whorl')
|
||||
elif self._abort:
|
||||
pass
|
||||
else:
|
||||
self._abort = True
|
||||
self._last_result = 'Unexpected signal values'
|
||||
print('Unexpected signal values')
|
||||
elif signal == 'VerifyFingerSelected':
|
||||
pass
|
||||
elif signal == 'VerifyStatus':
|
||||
self._abort = True
|
||||
self._last_result = params[0]
|
||||
self._verify_stopped = params[1]
|
||||
else:
|
||||
self._abort = True
|
||||
self._last_result = 'Unexpected signal'
|
||||
|
||||
signal_id = self.device.connect('g-signal', signal_cb)
|
||||
|
||||
self.device.Claim('(s)', 'testuser')
|
||||
|
||||
self.device.EnrollStart('(s)', 'right-index-finger')
|
||||
|
||||
self.send_image('whorl')
|
||||
|
||||
self._abort = False
|
||||
while not self._abort:
|
||||
ctx.iteration(True)
|
||||
|
||||
assert self._last_result == 'enroll-completed'
|
||||
|
||||
self.device.EnrollStop()
|
||||
|
||||
assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
|
||||
|
||||
# Finger is enrolled, try to verify it
|
||||
self.device.VerifyStart('(s)', 'any')
|
||||
|
||||
# Try a wrong print; will stop verification
|
||||
self.send_image('tented_arch')
|
||||
self._abort = False
|
||||
while not self._abort:
|
||||
ctx.iteration(True)
|
||||
assert self._verify_stopped == True
|
||||
assert self._last_result == 'verify-no-match'
|
||||
|
||||
self.device.VerifyStop()
|
||||
self.device.VerifyStart('(s)', 'any')
|
||||
|
||||
# Send a retry error (swipe too short); will not stop verification
|
||||
self.send_retry()
|
||||
self._abort = False
|
||||
while not self._abort:
|
||||
ctx.iteration(True)
|
||||
assert self._verify_stopped == False
|
||||
assert self._last_result == 'verify-swipe-too-short'
|
||||
|
||||
# Try the correct print; will stop verification
|
||||
self.send_image('whorl')
|
||||
self._abort = False
|
||||
while not self._abort:
|
||||
ctx.iteration(True)
|
||||
assert self._verify_stopped == True
|
||||
assert self._last_result == 'verify-match'
|
||||
|
||||
|
||||
# And delete the print(s) again
|
||||
self.device.DeleteEnrolledFingers('(s)', 'testuser')
|
||||
|
||||
assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
|
||||
|
||||
GLib.source_remove(timeout)
|
||||
|
||||
self.device.disconnect(signal_id)
|
||||
|
||||
self.device.Release()
|
||||
self.daemon_stop()
|
||||
self.polkitd_stop()
|
||||
|
||||
def test_enroll_delete2(self):
|
||||
self.polkitd_start()
|
||||
self.daemon_start()
|
||||
|
||||
if self.device is None:
|
||||
self.daemon_stop()
|
||||
self.polkitd_stop()
|
||||
self.skipTest("Need virtual_image device to run the test")
|
||||
|
||||
def timeout_cb(*args):
|
||||
# Note: With meson we could just rely on it to kill us
|
||||
print("Test timed out, hard exiting")
|
||||
sys.exit(1)
|
||||
|
||||
timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb)
|
||||
|
||||
self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername',
|
||||
'net.reactivated.fprint.device.enroll',
|
||||
'net.reactivated.fprint.device.verify'])
|
||||
|
||||
def signal_cb(proxy, sender, signal, params):
|
||||
print(signal, params)
|
||||
if signal == 'EnrollStatus':
|
||||
self._abort = params[1]
|
||||
self._last_result = params[0]
|
||||
|
||||
if not self._abort and self._last_result == 'enroll-stage-passed':
|
||||
self.send_image('whorl')
|
||||
elif self._abort:
|
||||
pass
|
||||
else:
|
||||
self._abort = True
|
||||
self._last_result = 'Unexpected signal values'
|
||||
print('Unexpected signal values')
|
||||
elif signal == 'VerifyFingerSelected':
|
||||
pass
|
||||
elif signal == 'VerifyStatus':
|
||||
self._abort = True
|
||||
self._last_result = params[0]
|
||||
self._verify_stopped = params[1]
|
||||
else:
|
||||
self._abort = True
|
||||
self._last_result = 'Unexpected signal'
|
||||
|
||||
signal_id = self.device.connect('g-signal', signal_cb)
|
||||
|
||||
self.device.Claim('(s)', 'testuser')
|
||||
|
||||
self.device.EnrollStart('(s)', 'right-index-finger')
|
||||
|
||||
self.send_image('whorl')
|
||||
|
||||
self._abort = False
|
||||
while not self._abort:
|
||||
ctx.iteration(True)
|
||||
|
||||
assert self._last_result == 'enroll-completed'
|
||||
|
||||
self.device.EnrollStop()
|
||||
|
||||
assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
|
||||
|
||||
# And delete the print(s) again using the new API
|
||||
self.device.DeleteEnrolledFingers2()
|
||||
|
||||
assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))
|
||||
|
||||
GLib.source_remove(timeout)
|
||||
|
||||
self.device.disconnect(signal_id)
|
||||
|
||||
self.device.Release()
|
||||
self.daemon_stop()
|
||||
self.polkitd_stop()
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) == 2 and sys.argv[1] == "list-tests":
|
||||
for machine, human in list_tests():
|
||||
print("%s %s" % (machine, human), end="\n")
|
||||
sys.exit(0)
|
||||
|
||||
unittest.main(verbosity=2)
|
||||
Reference in New Issue
Block a user