mirror of
https://gitlab.com/mishakmak/pam-fprint-grosshack.git
synced 2026-04-08 20:03:34 +02:00
dbusmock/polkitd: Add ability to simulate call hangs or delays
Added various methods that allow to make methods to delay to return a value, both by using timing functions and using a way to manually stop and restart the calls. This is mostly done using async callbacks in dbus methods
This commit is contained in:
committed by
Benjamin Berg
parent
110c0018a2
commit
9d6c7eb1a9
@ -18,6 +18,7 @@ __copyright__ = '(c) 2020 Canonical Ltd.'
|
||||
__license__ = 'LGPL 3+'
|
||||
|
||||
import dbus
|
||||
import time
|
||||
|
||||
from dbusmock import MOCK_IFACE, mockobject
|
||||
|
||||
@ -32,6 +33,10 @@ def load(mock, parameters):
|
||||
# default state
|
||||
polkitd.allow_unknown = False
|
||||
polkitd.allowed = []
|
||||
polkitd.delay = 0
|
||||
polkitd.simulate_hang = False
|
||||
polkitd.hanging_actions = []
|
||||
polkitd.hanging_calls = []
|
||||
|
||||
mock.AddProperties(MAIN_IFACE,
|
||||
dbus.Dictionary({
|
||||
@ -42,10 +47,18 @@ def load(mock, parameters):
|
||||
|
||||
|
||||
@dbus.service.method(MAIN_IFACE,
|
||||
in_signature='(sa{sv})sa{ss}us', out_signature='(bba{ss})')
|
||||
def CheckAuthorization(self, subject, action_id, details, flags, cancellation_id):
|
||||
return (action_id in self.allowed or self.allow_unknown, False, {'test': 'test'})
|
||||
in_signature='(sa{sv})sa{ss}us', out_signature='(bba{ss})',
|
||||
async_callbacks=('ok_cb', 'err_cb'))
|
||||
def CheckAuthorization(self, subject, action_id, details, flags, cancellation_id,
|
||||
ok_cb, err_cb):
|
||||
time.sleep(self.delay)
|
||||
allowed = action_id in self.allowed or self.allow_unknown
|
||||
ret = (allowed, False, {'test': 'test'})
|
||||
|
||||
if self.simulate_hang or action_id in self.hanging_actions:
|
||||
self.hanging_calls.append((ok_cb, ret))
|
||||
else:
|
||||
ok_cb(ret)
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='b', out_signature='')
|
||||
def AllowUnknown(self, default):
|
||||
@ -56,6 +69,32 @@ def AllowUnknown(self, default):
|
||||
'''
|
||||
self.allow_unknown = default
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='d', out_signature='')
|
||||
def SetDelay(self, delay):
|
||||
'''Makes the CheckAuthorization() method to delay'''
|
||||
self.delay = delay
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='b', out_signature='')
|
||||
def SimulateHang(self, hang):
|
||||
'''Makes the CheckAuthorization() method to hang'''
|
||||
self.simulate_hang = hang
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='as', out_signature='')
|
||||
def SimulateHangActions(self, actions):
|
||||
'''Makes the CheckAuthorization() method to hang on such actions'''
|
||||
self.hanging_actions = actions
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='', out_signature='')
|
||||
def ReleaseHangingCalls(self):
|
||||
'''Calls all the hanging callbacks'''
|
||||
for (cb, ret) in self.hanging_calls:
|
||||
cb(ret)
|
||||
self.hanging_calls = []
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='', out_signature='b')
|
||||
def HaveHangingCalls(self):
|
||||
'''Check if we've hangling calls'''
|
||||
return len(self.hanging_calls)
|
||||
|
||||
@dbus.service.method(MOCK_IFACE, in_signature='as', out_signature='')
|
||||
def SetAllowed(self, actions):
|
||||
|
||||
Reference in New Issue
Block a user