Overview

The Python program on this page is intended to illustrate how to update the firmware of groov EPIC and RIO devices.

The program is also useful just by itself, if you want to update firmware from a shell script.

Prerequisites

Python 3.10.12 was used when creating the example program. Earlier versions might work but are untested.

Several Python packages also need to be installed.

Using pip on Windows or Linux, install the packages with this command:

pip install --user requests halo

Usage

To call the program, we need to pass in three pieces of information:

  • The device’s hostname or IP address as the first argument.
  • The firmware file as the second argument.
  • An API Key for a Groov Admin user. This can be passed in either an optional argument named --apikey or in an environment variable named “GROOV_API_KEY”.

For example, on Linux:

python3 groov-update.py 192.168.137.22 /home/opto/grv-epic-pr1-ignition7-3.6.0-b.32.field.bin --apikey=DjV43UWfNoChHKkPZaebwjacH8w3Pvmx

Help:

$ python3 groov-update.py --help

usage: groov-update [-h] [-k [APIKEY]] address firmware

Update the firmware on a groov EPIC or RIO.

positional arguments:
  address               IP address or hostname of a Groov EPIC or RIO device
  firmware              The firmware to install

options:
  -h, --help            show this help message and exit
  -k [APIKEY], --apikey [APIKEY]
                        The API Key of a Groov Admin user. Alternatively, the key can be passed in
                        an Environment Variable named "GROOV_API_KEY".

Environment Variable

While it’s beyond the scope of this page, please note that it’s considered a best practice to inject secrets like an API key into a command shell’s environment. This is why the API key can optionally come from an environment variable named “GROOV_API_KEY”.

Source Code

groov-update.py:

import requests
import json
import argparse
import math
import urllib3
import time
import random
import os.path
import sys
from halo import Halo

# The Halo library (https://pypi.org/project/halo/) is used to show progress.
progress = Halo(spinner='dots')


# Disable SSL warnings. It's common to use self-signed certificates with
# Groov devices, but the library will print out warnings to the console.
urllib3.disable_warnings()

# For demonstration purposes, do not verify the server's certificate, since
# devices have a self-signed certificate by default.
VERIFY_SERVER_CERT = False

NORMAL_REQUEST_TIMEOUT = 15  # RIO needs a longer timeout than EPIC.
UPLOAD_REQUEST_TIMEOUT = 300


def main():

    # Get the arguments from the command line.
    cmd_args = get_args()

    print(f'Updating device at "{cmd_args.address}" with '
          f'firmware "{cmd_args.firmware}"')

    # Initialize the update
    init_success = initialize_update(cmd_args.address, cmd_args.apikey)
    if (not init_success):
        sys.exit(1)

    # Upload the firmware
    upload_success = upload_firmware(
        cmd_args.address, cmd_args.apikey, cmd_args.firmware)
    if (not upload_success):
        sys.exit(1)

    # Monitor status
    update_success = update_status_until_done(cmd_args.address,
                                              cmd_args.apikey)

    if (not update_success):
        sys.exit(1)

    # Wait for the two restarts
    time.sleep(10)
    wait_for_restarts(cmd_args.address, 600)

    print("The device is updated and ready.")


# Get the command-line arguments
def get_args():
    parser = argparse.ArgumentParser(
        prog='groov-update',
        description='Update the firmware on a groov EPIC or RIO.',
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('-k', '--apikey',
                        nargs='?',
                        help='The API Key of a Groov Admin user. '
                        'Alternatively, the key can be passed in an '
                        'Environment Variable named "GROOV_API_KEY".')
    parser.add_argument('address',
                        help='IP address or hostname of a Groov EPIC '
                        'or RIO device')
    parser.add_argument('firmware',
                        help='The firmware to install')

    parsed_args = parser.parse_args()

    # If there's not an API key from the arguments, then there must be one
    # from the 'GROOV_API_KEY' environment variable.
    # A key from the argument takes precedence over the environment variable.
    if (('apikey' not in parsed_args) or (parsed_args.apikey is None)):
        env_api_key = get_api_key_from_environment()

        if (env_api_key is None):
            print('An API Key is required, either in an environment '
                  'variable named "GROOV_API_KEY" or from the "--apikey" '
                  'argument.')
            sys.exit(1)
        else:
            parsed_args.apikey = env_api_key

    return parsed_args


# It's considered a Best Practice to inject secrets into programs through
# environment variables rather than a command-line argument.
def get_api_key_from_environment():
    if ('GROOV_API_KEY' in os.environ):
        return os.environ['GROOV_API_KEY']
    else:
        return None


# Ititialize the firmware update.
def initialize_update(address, api_key):

    # Tell the device that we are going to upload firmware.
    # POST to the "/manage/api/v1/maintenance/update/prepare" endpoint.
    # The data object has a very important property called "preserveSettings".
    # If set to True, the device's settings will be preserved.
    # If set to False, nothing will be preserved and the device will need to
    # be recommissioned.
    prepare_response = requests.post(
        "https://" + address + "/manage/api/v1/maintenance/update/prepare",
        json={
            "preserveSettings": True,

            # In 4.0 and ealier firmware, "backupNetwork" meant to preserve
            # settings. For clarity, in 4.1 it was renamed to
            # "preserveSettings". For backwards compatibility with earlier
            # firmware, we can just set them both.
            "backupNetwork":    True
        },
        headers={"apiKey": api_key},
        verify=VERIFY_SERVER_CERT,
        timeout=NORMAL_REQUEST_TIMEOUT)

    if (prepare_response.status_code == 200):
        return True
    else:
        print("ERROR when initializing the firmware update. Status code: " +
              str(prepare_response.status_code))

        if (prepare_response.status_code == 401):
            print('UNAUTHORIZED. Check the API Key.')

    return False


# Upload the firmware
def upload_firmware(address, api_key, filename):

    progress.start("Uploading firmware")

    # The firmware must be broken up into manageable chunks.
    # Each chunk is uploaded separately, one after another.
    CHUNK_SIZE = 25000000

    # Determine the number of chunks to be uploaded.
    file_size = os.path.getsize(filename)
    num_chunks = math.ceil(file_size / CHUNK_SIZE)

    # Use a random ID to identify this update. This is used by Groov Manage to
    # reassemble the chunks.
    file_id = random.randint(0, 10000000)

    # Loop over the chunks
    with open(filename, mode='rb') as file:
        chunk_index = -1

        while (True):
            chunk_index += 1

            # Read a chunk of the firmware update file.
            chunk = file.read(CHUNK_SIZE)
            chunk_length = len(chunk)

            # If no data is left, then break out of this loop.
            if (chunk_length == 0):
                progress.succeed()
                return True

            progress.text = f"Uploading firmware (part {chunk_index+1} " \
                f"of {num_chunks})"

            # The "apply" endpoint needs several headers.
            upload_headers = {
                'uploader-chunk-number': str(chunk_index),
                'uploader-chunks-total': str(num_chunks),
                'uploader-file-id': str(file_id),
                'apiKey': api_key
            }

            # POST the file chunk to "/manage/api/v1/maintenance/update/apply"
            apply_response = requests.post(
                "https://" + address +
                "/manage/api/v1/maintenance/update/apply",
                headers=upload_headers,
                files={'file': chunk},
                verify=VERIFY_SERVER_CERT,
                timeout=UPLOAD_REQUEST_TIMEOUT)

            # Check for any errors.
            if (apply_response.status_code >= 400):
                progress.fail()
                print("ERROR - Status code: " +
                      str(apply_response.status_code))
                return False

    progress.fail()

    return False


# Check the update status until an error occurs or it finishes.
def update_status_until_done(address, api_key):

    progress.start('Installing firmware')

    # Check the status until the update process is complete.
    while (True):
        # GET the status from the "/manage/api/v1/maintenance/update/status"
        # endpoint.
        status_response = requests.get(
            "https://" + address + "/manage/api/v1/maintenance/update/status",
            headers={"apiKey": api_key},
            verify=VERIFY_SERVER_CERT,
            timeout=NORMAL_REQUEST_TIMEOUT)

        # Check for an error in the request.
        if (status_response.status_code != 200):
            progress.fail()
            print("ERROR - Status code: " +
                  str(status_response.status_code))
            return False

        # Convert JSON text to Python object
        try:
            status_data = json.loads(status_response.text)
        except Exception:
            progress.fail()
            print(f"Could not parse response as JSON: {status_response.text}")
            return False

        # Check for any errors in the update process.
        if (check_status_errors(status_data)):
            progress.fail()
            return False

        # Print out the status.
        update_status(status_data)

        # Wait several seconds. It must be less than 5 seconds or we might
        # miss the final update.
        time.sleep(3)

        if (status_data['finishAndRestart']['status'] == 'done'):
            progress.succeed()
            return True


# Check the status for errors. Any error will cause the update to fail.
def check_status_errors(status_response):
    # Loop over the keys in the status response.
    for key, value in status_response.items():
        if ('status' in value and value['status'] == 'error'):

            print()  # flush the line
            print('ERROR CODE: ' + str(value['errorCode']))

            if ('errorMessage' in value):
                print('ERROR MESSAGE: ' + value['errorMessage'])
            if ('errorData' in value):
                print('ERROR DATA: ' + str(value['errorData']))

            return True
    return False


# Check the update status.
#
# A typical status object looks like this:
# {
#     "prepare":          { "status": "done" },
#     "upload":           { "status": "done" },
#     "reassemble":       { "status": "done" },
#     "unzip":            { "status": "done" },
#     "decrypt":          { "status": "done" },
#     "decompress":       { "status": "done" },
#     "backupSettings":   { "status": "done" },
#     "install":          { "status": "running" },
#     "finishAndRestart": { "status": "pending" },
#     "active":           "active"
# }
def update_status(status_response):
    # Start a new line with the name of the stage that is now running.
    if (status_response['reassemble']['status'] == 'running'):
        progress.text = 'Reassembling firmware (step 1 of 7)'
    elif (status_response['unzip']['status'] == 'running'):
        progress.text = 'Unzipping firmware (step 2 of 7)'
    elif (status_response['decrypt']['status'] == 'running'):
        progress.text = 'Decrypting firmware (step 3 of 7)'
    elif (status_response['decompress']['status'] == 'running'):
        progress.text = 'Decompressing firmware (step 4 of 7)'
    elif (('backupSettings' in status_response and
           status_response['backupSettings']['status'] == 'running') or
          ('backupNetwork' in status_response and
           status_response['backupNetwork']['status'] == 'running')):
        # 'backupNetwork' was used in 1.0 to 4.0 firmware.
        # In 4.1, it was renamed to 'backupSettings'.
        progress.text = 'Backing up the system (step 5 of 7)'
    elif (status_response['install']['status'] == 'running'):
        progress.text = 'Installing firmware (step 6 of 7)'
    elif (status_response['finishAndRestart']['status'] == 'done'):
        progress.text = 'Restarting (step 7 of 7)'


# Wait for the two restarts. During that time, the web server is not running, so
# all REST API requests will naturally fail.
def wait_for_restarts(address, max_duration_sec):

    progress.start('Wait for restarts to finish')

    start_time = time.monotonic()

    # Check if the Auth service is responding. If so, then the device
    # has fully started.
    while (True):

        try:
            # GET the status from the "/auth/access/user/commission/status"
            # endpoint.
            # Many other endpoints could be used instead.
            status_response = requests.get(
                "https://" + address + "/auth/access/user/commission/status",
                verify=VERIFY_SERVER_CERT,
                timeout=NORMAL_REQUEST_TIMEOUT)

            # Check for success
            if (status_response.status_code == 200 and
                    status_response.text == "true"):
                elapsed_time = time.monotonic() - start_time
                progress.succeed()
                return True
        except requests.exceptions.ConnectionError:
            # It's normal to get a "requests.exceptions.ConnectionError"
            # exception when the device isn't responding yet.
            pass

        # Check the elapsed time.
        elapsed_time = time.monotonic() - start_time
        if (elapsed_time > max_duration_sec):
            progress.fail()
            return False

        # Wait several seconds. This is optional because the attempt to reach
        # the device also takes several seconds before failing.
        time.sleep(3)


try:
    main()
except KeyboardInterrupt:
    progress.stop_and_persist()
    print('Terminated by user')
    exit(1)
except Exception:
    progress.fail()
    print("ERROR: " + str(sys.exc_info()[1]))
    exit(1)