Skip to content

Robot Microservices

Introduction

April Robots Offline app uses microservices (MS) developed for each robot model. The app runs microservices as standalone FastAPI applications inside Docker/Podman containers in the WSL on Windows machines (see System Overview). The app automatically pulls latest compatible images (TODO: add ref to releases.md where this is explained by @shinratttensei1), builds them and runs them while the app is launched.

A microservice serves four main purposes:

  1. Run Python action scripts on a specific robot model with action queuing and emergency abort.
  2. Provide API (REST endpoints) for additional robot-related operations (volume, battery).
  3. Collect observations – raw data and computed metrics (see Robot Metrics) during execution of action scripts.
  4. Do all of the above in a generic architecture to be easily adapted for any robot model.

As of 08.05.26, AR uses these microservices:

  • NAO MS – first MS with legacy structure, relying on synchronous functions. Main dev: Alikhan.
  • Furhat MS – second MS based on NAO MS, but refactored and modified to work with asynchronous functions. Main dev: Olzhas.
  • Cozmo MS – third MS based on Furhat MS, not yet released. Main dev: Zhannur.

In plans:

  • Vector MS – slightly adapted from Cozmo?
  • QT MS
  • others…

General structure

The documentation below is based on Furhat MS. NAO MS has slightly different implementation and names, but its documentation can be found in its repository.

Generic components (should not change across robot microservices):

  • Routing & routers (routers/action.py, routers/robot.py).
  • Schemas for routes and Action entity (schemas/action.py, schemas/api.py).
  • Everything in core/ (queue management, subprocess management, main service).

Custom components (adapted for each implementation):

  • Action scripts in actions/ because action scripts use robot-specific API (libraries).
  • Content of functions and classes in interface/, but to an extent that does not require larger modifications to the architecture (observer, metrics, operations, robot interface).
  • Raw data and metrics schemas in schemas/observation/.
While microservice architecture is developed to have only custom components changed, it is sometimes not possible to adapt the MS to a new robot without more significant modifications. In such cases, be sure to update as little as possible, and update this documentation.

Explanation of core components and terms

  • Action (schemas/action.py) – exports Action schema (Pydantic model) for managing running action scripts and raw observations. Note that this schema is not connected to the Action database model in the offline app.
  • Action scripts – Python files that expose a function run which uses Robot interface instance to manipulate a robot (see Action script example).
  • Operations (interface/operations.py) – exports functions that are used by the Service for additional robot-related operations (goal 3), and create_robot_interface function for initializing the Robot interface (see below). All functions can be slighly adapted to work for every robot, but function signatures should not be changed.
    • create_robot_interface – called whenever something needs to be done with the robot (e.g. run an action, set volume, etc.). This function initializes a robot client and establishes connection to a robot via the client.
    • connection_operation – an operation that is performed when offline app sends a request to connect. This function does not connect to the robot. Instead, it uses already connected Robot interface instance to set up the robot into proper state (resets position and anything else).
    • stop_operation – similarly to connection_operation, this operation is used to reset the robot after an action script has been interrupted.
    • battery_operation – get the battery level.
    • volume_operation – set and return volume level.
  • Robot interface (interface/robot.py) – a public class that provides all API to work directly with a robot, used by Operations and in action scripts. Robot interface operates with already connected robot clients. It is initialized strictly by calling create_robot_interface (see above).
  • Queue manager (core/queue_manager.py) – singleton class used by routers to enqueue action scripts and get actions’ status. Manages the action queue, delegates execution to the Service, and sends HTTP callbacks with execution results and collected metrics to the backend. Lower-level explanation is provided in the code.
  • Subprocess manager (core/subprocess_manager.py) – singleton class used to manage the subprocess that runs an action script and an observer thread. Lower-level explanation is provided in the code.
  • Service (core/service.py) – singleton class used to execute action scripts and operations with retries. Executes action scripts in isolated subprocesses. Also coordinates observation collection, metrics calculation, and robot interface initialization. Lower-level explanation is provided in the code.
  • Observations – things collected by the Observer thread, AKA raw data. Consist of two separate types:
    • State observation (AKA event-agnostic state) – periodically collected homogeneous data.
    • Event-related observation – data collected based on real-time event occurence. Data varies based on the type of the event.
  • Observer thread – the component responsible for collecting raw observations. It is implemented as a subclass of threading.Thread and consists of two parts:
    • Absract observer thread class (core/observer_base.py) – generic absract class that provides things used by the custom observer thread subclass implementation.
    • Custom observer thread subclass (interface/observer.py) – collects raw observations periodically (every 1 second) or based on real-time events
  • Metrics computation (metric_computation.py) – exports generic function compute_metrics that uses raw data to calculate Robot Metrics for an Action instance.

Creating a new MS

  1. Clone code from a latest release of the Furhat or later MS, following repository naming conventions from the guidelines.
  2. Modify the Robot interface (robot.py) and create_robot_interface to use an appropriate robot client.
  3. Modify Operations (operations.py) to work with the new Robot interface
  4. Write action scripts using the Robot interface (see example).
  5. Modify raw data and metrics schemas in schemas/observation/ with what will be collected and computed.
  6. Modify the observer subclass (interface/observer.py) to collect expected raw data. Note that the raw data passed to collect_state should match EventAgnosticState schema.
  7. Modify subprocess_manager.py to match the updated MetricsDict type in _create_shared_observations().
  8. Modify metric_computation.py to compute the metrics using raw data.
  9. Modify config.py with appropriate robot name, port, etc.
  10. Increment the microservice port in Dockerfile, Docker Compose configs, and mise.toml (e.g. NAO uses 5050, Furhat uses 5051, so next should use 5052). Use CTRL + SHIFT + F to find all occurences.
  11. Run tests.
  12. Modify Offline App core to use the new robot microservice.
  13. Modify the Offline App installer and launcher to use the new robot microservice image/container/pod.

Examples

Action script example

NAO MS: general/goodbyeRUS.py

from app.interface.robot import RobotInterface


def run(robot_interface: RobotInterface):
    """Behavior converted from old_actions/old_actions.general/goodbyeRUS.py"""
    try:
        robot_interface.play_audio("animalsRUS/bye.wav")
        # Main script actions
        robot_interface.run_behavior("Stand/Gestures/Hey_1")

    except Exception as e:
        raise

Furhat MS: doctor/IntroRUS.py

import time

from actions.gestures import perform_lip_sync, perform_lip_sync_1, perform_lip_sync_2
from app.interface.robot import RobotInterface


async def run(robot_interface: RobotInterface) -> None:
    await robot_interface.realtime_api.request_attend_user()
    furhat = robot_interface.remote_api
    furhat.set_face(mask="adult", character="Jane")
    perform_lip_sync_1(
        furhat, 7, "file:///home/furnix/resources/DoctorRUS/IntroRUS/IntroRUS1_n.wav")
    time.sleep(2)
    perform_lip_sync_2(
        furhat, 4, "file:///home/furnix/resources/DoctorRUS/IntroRUS/IntroRUS2_n.wav")
    time.sleep(2)
    perform_lip_sync_2(
        furhat, 4, "file:///home/furnix/resources/DoctorRUS/IntroRUS/IntroRUS3_n.wav")
    time.sleep(2)
    perform_lip_sync(
        furhat, 3, "file:///home/furnix/resources/DoctorRUS/IntroRUS/IntroRUS4_n.wav")
    time.sleep(2)
    perform_lip_sync_2(
        furhat, 7, "file:///home/furnix/resources/DoctorRUS/IntroRUS/IntroRUS5_n.wav")
Unlike NAO MS, Furhat MS uses async def run.

Flow chart for processing actions with all key components

    flowchart TD

%% =======================
%% Endpoint Flow
%% =======================
Start([Endpoint Start])
    -->|POST /action| EP_REQ[Endpoint coro]
    --> QM1["QM.enqueue_action()"]

QM1 -->|Enter QM| QM2["Create Action"]
QM2 --> QM3["Put Action to queue"]
QM3 --> QM4["Create asyncio task"]

QM4 -->|Exit QM| QM5["~QM.enqueue_action()"]
QM5 -->|Return response| EP_RES[Endpoint finished]
EP_RES --> End([End])


%% =======================
%% Queue Manager Task
%% =======================
QM4 -->|Enter QM task| T_Start([Task Start])
T_Start --> T_Main["QM queue processing coro()"]

T_Main --> T_LoopStart[Actions processing loop]@{shape: notch-pent}
T_LoopStart --> T_Check{loop flag?}

T_Check -->|True| T_Get["get Action from queue"]
T_Get --> T_Run["Service run action script coro()"]

T_Check -->|False| T_Stop["~QM queue processing coro()"]
T_Stop -->|Exit QM task| T_End([Task End])


%% Loop continuation
T_Run -->|Exception| T_OnComplete1["QM on action complete coro()"]
T_OnComplete1 --> T_LoopEnd[End of action processing loop]@{shape: notch-pent}
T_LoopEnd -->|Repeat the processing loop| T_LoopStart


%% =======================
%% Service Coroutine
%% =======================
T_Run -->|Enter coro| Srv1["PM.init_subprocess()"]
Srv1 --> Srv2["PM.start_subprocess()"]
Srv2 --> Srv3["await PM.join_subproces"]
Srv3 --> Srv4["PM.finalize_observations()"]
Srv4 --> Srv5["PM.get_result_status()"]
Srv5 --> Srv6["QM on action complete coro()"]
Srv6 -->|Exit coro| Srv7["~Service run action script coro()"]

Srv7 --> T_LoopEnd


%% =======================
%% Subprocess Flow
%% =======================
Srv2 -->|Enter subprocess| SP_Start([Subprocess Start])

SP_Start --> SP1["Service.action_runner_wrapper()"]
SP1 -->|Enter event loop| SP2["Service run action module coro()"]
SP2 --> SP3["Load the script"]
SP3 --> SP4["await create_robot_interface()"]
SP4 --> SP5["make_observation_thread()"]
SP5 --> SP6["Start observation thread"]
SP6 --> SP7["await module.run()"]
SP7 --> SP8["Join the observer thread"]
SP8 --> SP9["~Service run action module coro()"]

SP9 -->|Exit event loop| SP10["Put result to queue"]
SP10 --> SP11["~Service.action_runner_wrapper()"]

SP11 --> SP_Stop([Subprocess End])
SP_Stop -->|Exit subprocess| Srv3


%% =======================
%% Observation Thread Flow
%% =======================
SP6 -->|Enter obs thread| OT_Start([Observation Thread Start])
OT_Start --> OT1["OI._run()"]
OT1 --> OT2["Start the event loop thread"]

%% Job Loop
OT2 --> OT3["Job processing loop"]@{shape: notch-pent}
OT3 --> OT4{"Stop event is set?"}

OT4 -->|True| OT10["Clean up event loop thread"]
OT10 --> OT11["Stop the event loop"]
OT11 --> OT12["~OI._run()"]
OT12 --> OT_End([Observation Thread End])

OT4 -->|False| OT5["Try get from jobs queue"]
OT5 --> OT6{"Got a job?"}

OT6 -->|True| OT7["TODO: run CV models"]
OT7 --> OT8["OT.record_state()"]
OT8 -->|Repeat the loop| OT3

OT6 -->|False| OT9["Sleep X seconds"]@{shape: delay}
OT9 -->|Repeat the loop| OT3
OT_End -->|Exit obs thread| SP8

%% Event loop thread
OT2 -->|Enter event loop thread| ELT1["Set up EL thread (handlers, requests)"]
ELT1 --> ELT2[/"Receive realtime client data"/]

ELT2 -->|Users data handler| ELT4["Get latest frame"]
ELT4 --> ELT5["Put new job with camera & users data to jobs queue"]
ELT5 --> ELT2

ELT2 -->|Camera data handler| ELT3["Save latest frame"]
ELT3 --> ELT2
  

Source code:

flowchart TD

%% =======================
%% Endpoint Flow
%% =======================
Start([Endpoint Start])
    -->|POST /action| EP_REQ[Endpoint coro]
    --> QM1["QM.enqueue_action()"]

QM1 -->|Enter QM| QM2["Create Action"]
QM2 --> QM3["Put Action to queue"]
QM3 --> QM4["Create asyncio task"]

QM4 -->|Exit QM| QM5["~QM.enqueue_action()"]
QM5 -->|Return response| EP_RES[Endpoint finished]
EP_RES --> End([End])


%% =======================
%% Queue Manager Task
%% =======================
QM4 -->|Enter QM task| T_Start([Task Start])
T_Start --> T_Main["QM queue processing coro()"]

T_Main --> T_LoopStart[Actions processing loop]@{shape: notch-pent}
T_LoopStart --> T_Check{loop flag?}

T_Check -->|True| T_Get["get Action from queue"]
T_Get --> T_Run["Service run action script coro()"]

T_Check -->|False| T_Stop["~QM queue processing coro()"]
T_Stop -->|Exit QM task| T_End([Task End])


%% Loop continuation
T_Run -->|Exception| T_OnComplete1["QM on action complete coro()"]
T_OnComplete1 --> T_LoopEnd[End of action processing loop]@{shape: notch-pent}
T_LoopEnd -->|Repeat the processing loop| T_LoopStart


%% =======================
%% Service Coroutine
%% =======================
T_Run -->|Enter coro| Srv1["PM.init_subprocess()"]
Srv1 --> Srv2["PM.start_subprocess()"]
Srv2 --> Srv3["await PM.join_subproces"]
Srv3 --> Srv4["PM.finalize_observations()"]
Srv4 --> Srv5["PM.get_result_status()"]
Srv5 --> Srv6["QM on action complete coro()"]
Srv6 -->|Exit coro| Srv7["~Service run action script coro()"]

Srv7 --> T_LoopEnd


%% =======================
%% Subprocess Flow
%% =======================
Srv2 -->|Enter subprocess| SP_Start([Subprocess Start])

SP_Start --> SP1["Service.action_runner_wrapper()"]
SP1 -->|Enter event loop| SP2["Service run action module coro()"]
SP2 --> SP3["Load the script"]
SP3 --> SP4["await create_robot_interface()"]
SP4 --> SP5["make_observation_thread()"]
SP5 --> SP6["Start observation thread"]
SP6 --> SP7["await module.run()"]
SP7 --> SP8["Join the observer thread"]
SP8 --> SP9["~Service run action module coro()"]

SP9 -->|Exit event loop| SP10["Put result to queue"]
SP10 --> SP11["~Service.action_runner_wrapper()"]

SP11 --> SP_Stop([Subprocess End])
SP_Stop -->|Exit subprocess| Srv3


%% =======================
%% Observation Thread Flow
%% =======================
SP6 -->|Enter obs thread| OT_Start([Observation Thread Start])
OT_Start --> OT1["OI._run()"]
OT1 --> OT2["Start the event loop thread"]

%% Job Loop
OT2 --> OT3["Job processing loop"]@{shape: notch-pent}
OT3 --> OT4{"Stop event is set?"}

OT4 -->|True| OT10["Clean up event loop thread"]
OT10 --> OT11["Stop the event loop"]
OT11 --> OT12["~OI._run()"]
OT12 --> OT_End([Observation Thread End])

OT4 -->|False| OT5["Try get from jobs queue"]
OT5 --> OT6{"Got a job?"}

OT6 -->|True| OT7["TODO: run CV models"]
OT7 --> OT8["OT.record_state()"]
OT8 -->|Repeat the loop| OT3

OT6 -->|False| OT9["Sleep X seconds"]@{shape: delay}
OT9 -->|Repeat the loop| OT3
OT_End -->|Exit obs thread| SP8

%% Event loop thread
OT2 -->|Enter event loop thread| ELT1["Set up EL thread (handlers, requests)"]
ELT1 --> ELT2[/"Receive realtime client data"/]

ELT2 -->|Users data handler| ELT4["Get latest frame"]
ELT4 --> ELT5["Put new job with camera & users data to jobs queue"]
ELT5 --> ELT2

ELT2 -->|Camera data handler| ELT3["Save latest frame"]
ELT3 --> ELT2

Sequence diagram for data exchange b/w backend and MS

    sequenceDiagram
participant F as Frontend
participant B as Backend
participant MS as MS interface
participant QM@{ "type": "control" } as MS Queue Manager

    F->>+B: User requests action start
    B->>B: Create and save PerformedAction
    B->>+MS: Request to enqueue the action
    MS->>QM: Call QM.enqueue_action()
    QM->>MS: Is first in queue or not
    MS->>-B: Respond 200 with `is_first`
    B->>B: Update PerformedAction status <br> to "Running" or "Queued"
    B->>-F: Respond with 200 or 201

    Note over F, QM: Asynchronously whenever actions queue is not empty <br> and queued actions ARE NOT stopped.
    QM->>QM: Retrieve action from queue
    QM->>QM: Process action until end
    QM->>B: Send action execution result with all data and metrics
    B->>B: Update PerformedAction

    Note over F, QM: Asynchronously whenever actions queue is not empty <br> and queued actions ARE stopped.
    QM->>QM: Retrieve action from queue
    QM->>QM: Start processing action with Service
    F->>+B: User requests action stop
    B->>+MS: Request to stop all actions
    MS->>QM: Call QM.stop_all_actions()
    QM->>QM: Stop running action via Service <br> (Terminates the subprocess <br> and performs the stop operation)
    QM->>B: Send action execution result with all data and metrics
    B->>B: Update PerformedAction

    loop While queue is not empty
    QM->>QM: Retrieve action from queue
    QM->>QM: Process action stopping
    QM->>B: Send action non-execution result with all data
    B->>B: Update PerformedAction
    end

    QM->>MS: Return result of stopping <br> the running action
    MS->>-B: Return result of stopping <br> the running action
    B->>B: Update PerformedAction <br> (actually does nothing as of May 2026)
    B->>-F: Return result of stopping <br> the running action