import re
from subprocess import call, check_call, check_output, CalledProcessError
from fstest_defs import *
+import u_boot_utils as util
supported_fs_basic = ['fat16', 'fat32', 'ext4']
supported_fs_ext = ['fat16', 'fat32']
global fuse_mounted
try:
- check_call('guestmount -a %s -m /dev/sda %s'
+ check_call('guestmount --pid-file guestmount.pid -a %s -m /dev/sda %s'
% (device, mount_point), shell=True)
fuse_mounted = True
return
if fuse_mounted:
call('sync')
call('guestunmount %s' % mount_point, shell=True)
+
+ try:
+ with open("guestmount.pid", "r") as pidfile:
+ pid = int(pidfile.read())
+ util.waitpid(pid, kill=True)
+ os.remove("guestmount.pid")
+
+ except FileNotFoundError:
+ pass
+
else:
call('sudo umount %s' % mount_point, shell=True)
import os
import os.path
import pytest
+import signal
import sys
import time
import re
assert m, 'CRC32 operation failed.'
return m.group(1)
+
+def waitpid(pid, timeout=60, kill=False):
+ """Wait a process to terminate by its PID
+
+ This is an alternative to a os.waitpid(pid, 0) call that works on
+ processes that aren't children of the python process.
+
+ Args:
+ pid: PID of a running process.
+ timeout: Time in seconds to wait.
+ kill: Whether to forcibly kill the process after timeout.
+
+ Returns:
+ True, if the process ended on its own.
+ False, if the process was killed by this function.
+
+ Raises:
+ TimeoutError, if the process is still running after timeout.
+ """
+ try:
+ for _ in range(timeout):
+ os.kill(pid, 0)
+ time.sleep(1)
+
+ if kill:
+ os.kill(pid, signal.SIGKILL)
+ return False
+
+ except ProcessLookupError:
+ return True
+
+ raise TimeoutError(
+ "Process with PID {} did not terminate after {} seconds."
+ .format(pid, timeout)
+ )