Skip to content
Snippets Groups Projects
Commit 871cbc18 authored by root's avatar root Committed by Nico
Browse files

inital

parents
No related branches found
No related tags found
No related merge requests found
# Autoupdate Tester
This script tests the Gluon autoupdater by continuously updating a VM from one firmware version to the next one using the autoupdater.
Before starting the update, we will create a snapshot. After successful update, we will rollback to that snapshot and start again. When a update failed, we will leave a snapshot named `test_fail_%Y_%m_%d_%H_%M_%S` behind so you can investigate what went wrong.
## Setup
This script to be run on Proxmox with a fully configured Gluon node as VM. This script will use a virtual serial port to communicate with the VM, so be sure to set one up. To verify it's working run `qm terminal <vmid>` and press enter. You should see a root prompt.
In Gluon, set up the update channel as desired, but *disable* the autoupdater. The script will run `autoupdater -f`, so the autoupdater will run even when disabled.
There are several things, which currently only work for Freifunk Stuttgart, but it should be easy to adjust to your community.
#!/usr/bin/python3
import argparse
import logging
import subprocess
import json
from datetime import datetime
import time
import pexpect
from contextlib import contextmanager
import re
import sys
ap = argparse.ArgumentParser()
ap.add_argument("vmid", help="Proxmox VM ID with Gluon Node")
ap.add_argument("--updatecount", help="Number of updates to perform", default=1000, type=int)
ap.add_argument("--debug", help="Enable debugging output", action="store_true")
ap.add_argument("--before-version", help="Version expected running before update", default="2.0+2020-09-26-g.8547bd43-s.f16f34e")
ap.add_argument("--after-version", help="Version expected running after update", default="2.1+2020-12-11-g.90d0e33c-s.de75272")
args = ap.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
PROMPT_REGEX = r'root@[^ ]+:[^ ]+ '
def spawn_console():
terminal = pexpect.spawn("qm terminal {}".format(args.vmid), echo=False, maxread=20000)
terminal.sendline('')
terminal.expect(PROMPT_REGEX)
return terminal
def run_in_vm(cmd, asynchronous=False):
EXITCODE_REGEX = re.compile(r'^EXITCODE\(([0-9]+)\)', flags=re.MULTILINE)
OUTPUT_REGEX = re.compile(r'CMD\((.*)\)', flags=re.DOTALL)
logging.debug("executing in Node: {}".format(cmd))
terminal = spawn_console()
cmd_to_run = 'echo -n CMD\(; ' + ' '.join(cmd) + '; exitcode=$?; echo -n \)'
terminal.sendline(cmd_to_run)
result = {}
if not asynchronous:
logging.debug("Waiting for command to return...")
terminal.expect(PROMPT_REGEX)
out_data_with_cmd = terminal.before.decode("utf-8")
out_match = OUTPUT_REGEX.search(out_data_with_cmd)
if not out_match:
logging.error("Could not extract command output before='{}'!".format(terminal.before))
terminal.close()
raise ValueError("Could not extract cmd output")
out_data = out_match.group(1)
terminal.sendline('echo EXITCODE\($exitcode\)')
terminal.expect(PROMPT_REGEX)
exitcode_match = EXITCODE_REGEX.search(terminal.before.decode("utf-8"))
if not exitcode_match:
logging.error("Could not extract exitcode before='{}'!".format(terminal.before))
terminal.close()
raise ValueError("Could not extract exitcode")
exitcode = int(exitcode_match.group(1))
result = {"exitcode": exitcode, "out-data": out_data}
logging.debug("execution exitcode={} output='{}'".format(result["exitcode"], result["out-data"]))
else:
terminal.expect("CMD\(")
terminal.close()
return result
def has_gw_connection():
result = run_in_vm(["ping", "-c1", "fd21:b4dc:4b00::a38:1"])
return result["exitcode"] == 0
def wait_for_gw_connection():
logging.debug("Waiting for GW connection")
for i in range(90):
try:
if has_gw_connection():
logging.debug("GW connection established")
return
except subprocess.CalledProcessError:
logging.debug("Pinging GW failed, VM likely not yet running.")
raise TimeoutError("Time out waiting for GW connection")
def restore_snapshot(snap_name):
logging.debug("Restoring VM snapshot '{}'".format(snap_name))
snapshot_cmd = ["qm", "rollback", args.vmid, snap_name]
subprocess.check_call(snapshot_cmd)
def create_snapshot(prefix="autoupdate_test"):
snap_name = datetime.now().strftime(prefix + "_%Y_%m_%d_%H_%M_%S")
logging.debug("Creating VM snapshot '{}'".format(snap_name))
snapshot_cmd = ["qm", "snapshot", args.vmid, snap_name, "--vmstate", "1"]
subprocess.check_call(snapshot_cmd)
return snap_name
def delete_snapshot(snap_name):
logging.debug("Deleting VM snapshot '{}'".format(snap_name))
snapshot_delete_cmd = ["qm", "delsnapshot", args.vmid, snap_name]
subprocess.check_call(snapshot_delete_cmd)
@contextmanager
def create_snapshot_context():
snap_name = create_snapshot()
try:
yield snap_name
finally:
logging.debug("Context manager left, restoring snapshot")
restore_snapshot(snap_name)
delete_snapshot(snap_name)
def wait_for_unavailable():
"Rebooting system"
logging.debug("Waiting for VM to become unavailable...")
for i in range(90):
result = subprocess.run(["qm", "guest", "cmd", args.vmid, "ping"])
if result.returncode == 0:
logging.debug("Try {}: VM available".format(i))
else:
logging.debug("VM has become unavailable after {} tries".format(i))
return True
time.sleep(1)
logging.warning("Timeout waiting for VM to become unavailable")
return False
def wait_for_available():
logging.debug("Waiting for VM to become available...")
for i in range(90):
result = subprocess.run(["qm", "guest", "cmd", args.vmid, "ping"])
if result.returncode != 0:
logging.debug("Try {}: VM unavailable".format(i))
else:
logging.debug("VM has become available after {} tries".format(i))
return True
time.sleep(1)
logging.warning("Timeout waiting for VM to become available")
return False
def wait_for_reboot():
if wait_for_unavailable() and wait_for_available():
return True
logging.warning("Timeout waiting for VM reboot")
return False
def run_autoupdate():
logging.debug("Forcing autoupdate")
terminal = spawn_console()
logging.debug("executing autoupdater")
terminal.sendline("autoupdater -f")
logging.debug("Waiting for system reboot")
terminal.expect("Rebooting system")
logging.debug("Reboot detected, waiting for console...")
terminal.expect("Press enter to")
logging.debug("Waiting for MLD Querier message from batman...")
terminal.expect("batman_adv: bat0: MLD Querier appeared", timeout=300)
logging.debug("MLD Querier appeared, activating console")
terminal.sendline()
terminal.sendline()
terminal.expect(PROMPT_REGEX)
logging.debug("System booted")
def assert_release(release_to_assert):
result = run_in_vm(["cat", "/lib/gluon/release"])
running_release = result["out-data"].strip()
logging.debug("Found release '{}'".format(running_release))
if result["exitcode"] == 0 and running_release == release_to_assert:
return True
else:
logging.warning("Release '{}' does not match expected '{}'".format(running_release, release_to_assert))
return False
with create_snapshot_context() as snapshot:
for i in range(args.updatecount):
logging.info("Update cycle number {}".format(i))
if not assert_release(args.before_version):
logging.error("Not running the expected version, aborting")
sys.exit(1)
wait_for_gw_connection()
run_autoupdate()
if not assert_release(args.after_version):
logging.error("Not running the expected version after update, aborting")
create_snapshot(prefix="test_fail_{}".format(i))
sys.exit(1)
restore_snapshot(snapshot)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment