Skip to content

Reference for MissionManager

This is the primary method for interfacing with moos-ivp-agent's BHV_Agent


It is recommended to use MissionManager with the python context manager

from mivp_agent.manager import MissionManager

with MissionManager('trainer') as mgr:
  mgr.wait_for(['felix', 'evan'])
Source code in mivp_agent/
class MissionManager:
    This is the primary method for interfacing with moos-ivp-agent's BHV_Agent

      It is recommended to use MissionManager with the python context manager

      from mivp_agent.manager import MissionManager

      with MissionManager('trainer') as mgr:
        mgr.wait_for(['felix', 'evan'])

    def __init__(self, task, log=True, immediate_transition=True, log_whitelist=None, id_suffix=None, output_dir=None):
        The initializer for MissionManager

            task (str): For organization of saved data type is required to specify what task the MissionManager is preforming. For example a `MissionManager('trainer')` will log data under `generated_files/trainer/` in the current working directory.

            log (bool): Logging of agent transitions can be disabled by setting this to `False`.

            immediate_transition (bool): By default the the manager will assume that all messages received from BHV_Agents represent a new transition. If set to `False` one must manually tell set `msg.is_transition = True` on any objects returned from `get_message()`. This is helpful when you want to control what is considered a "state" in your Markov Decsion Process.

            log_whitelist (list): Setting this parameter will only log some transitions according to their reported `vnames`.

            id_suffix (str): Will be appended to the generated session id.

            output_dir (str): Path to a place to store files.
        self._msg_queue = Queue()

        self._vnames = []
        self._vname_lock = Lock()
        self._vehicle_count = 0
        self._episode_manager_states = {}
        self._ems_lock = Lock()
        self._episode_manager_nums = {}
        self._emn_lock = Lock()

        # Dict to hold queues of vnames to reset
        self._vresets = Queue()

        self._thread = None
        self._stop_signal = False

        if output_dir is None:
            output_dir = os.path.join(

        self._log_dir = LogDirectory(output_dir)
        self._id = self._init_session(id_suffix)

        # Calculate the path to the directories we will be writing to. This will be created when we first use them / return them to the user.
        self._model_path = os.path.join(
        self._log_path = os.path.join(
        self._model_path = os.path.abspath(self._model_path)
        self._log_path = os.path.abspath(self._log_path)

        self._log = log
        self._imm_transition = immediate_transition
        if self._log:
            self._log_whitelist = log_whitelist
            # Create data structs needed to log data from each vehicle
            self._logs = {}
            self._last_state = {}
            self._last_act = {}

            # Go ahead and create the log path

    def _init_session(self, id_suffix):
        # Start the session id with the current timestamp
        id = str(round(time.time()))

        # Add suffix if it exists
        if id_suffix is not None:
            id += f"-{id_suffix}"

        id = self._log_dir.meta.registry.register(id)

        return id

    def model_output_dir(self):
        if not os.path.isdir(self._model_path):
        return self._model_path

    def log_output_dir(self):
        Returns the log path for the current session so custom files can be added to it.
        assert self._log, "This method should not be used, when logging is disabled"
        return self._log_path

    def __enter__(self):
        return self

    def start(self):
        It is **not recommended** to use this method directly. Instead, consider using this class with the python context manager. This method starts a thread to read from the `ModelBridgeServer`.

          bool: False if thread has already been started, True otherwise
        if self._thread is not None:
            return False

        self._thread = Thread(target=self._server_thread, daemon=True)

        return True

    def _server_thread(self):
        live_msg_list = []
        address_map = {}
        with ModelBridgeServer() as server:
            while not self._stop_signal:
                # Accept new clients
                addr = server.accept()
                if addr is not None:
                    print(f'Got new connection: {addr}')
                    server.send_instr(addr, INSTR_SEND_STATE)

                # Listen for messages from vehicles
                for addr in server._clients:
                    msg = server.listen(addr)

                    if msg is not None:
                        with self._vname_lock:
                            if msg[KEY_ID] not in self._vnames:
                                print(f'Got new vehicle: {msg[KEY_ID]}')
                                vname = msg[KEY_ID]
                                address_map[vname] = addr
                                self._vehicle_count += 1

                        assert address_map[msg[KEY_ID]] == addr, "Vehicle changed vname. This violates routing / logging assumptions made by MissionManager"

                        m = MissionMessage(

                        with self._ems_lock:
                            self._episode_manager_states[m.vname] = m.episode_state
                        with self._emn_lock:
                            if m.episode_report is None:
                                self._episode_manager_nums[m.vname] = None
                                self._episode_manager_nums[m.vname] = m.episode_report['NUM']


                # Send responses to vehicle message if there are any
                for i, m in enumerate(live_msg_list):
                    with m._rsp_lock:
                        if m._response is None:

                        # If we got there is response send and remove from list
                        server.send_instr(m._addr, m._response)

                        # Do logging

                # Handle reseting of vehicles
                while not self._vresets.empty():
                    vname, success = self._vresets.get()

                    if vname not in address_map:
                        raise RuntimeError(
                            f'Received reset for unknown vehicle: {vname}')

                    instr = INSTR_RESET_FAILURE
                    if success:
                        instr = INSTR_RESET_SUCCESS

                    server.send_instr(address_map[vname], instr)

    # This message should only be called on msgs which have actions
    def _do_logging(self, msg):
        if not self._log:

        # Check in whitelist if exists
        if self._log_whitelist is not None:
            if msg.vname not in self._log_whitelist:

        # Check if this is a new vehicle
        if msg.vname not in self._logs:
            path = os.path.join(self._log_path, f"log_{msg.vname}")
            self._logs[msg.vname] = ProtoLogger(path, Transition, mode='w')

        if msg._is_transition:
            # Write a transition if this is not the first state ever
            if msg.vname in self._last_state:
                t = Transition()


            # Update the storage for next transition
            self._last_state[msg.vname] = msg.state
            self._last_act[msg.vname] = msg._response

    def are_present(self, vnames):
        Used to see if a specified list of vehicles has connected to the `MissionManager` instance yet.

        See also: [`wait_for()`][mivp_agent.manager.MissionManager.wait_for]

          vnames (iterable): A list / tuple of `str` values to look for
        for vname in vnames:
            with self._vname_lock:
                if vname not in self._vnames:
                    return False
        return True

    def wait_for(self, vnames, sleep=0.1):
        Used to block until a specified list of vehicles has connect to the `MissionManager` instance.

          vnames (iterable): A list / tuple of `str` values to look for
          sleep (float): Amount of time in seconds to sleep for between checks
        while not self.are_present(vnames):

    def get_message(self, block=True):
        Used as the primary method for receiving data from `BHV_Agent`.

        **NOTE:** Messages **MUST** be responded to as `BHV_Agent` will not send another update until it has a response to the last.

          block (bool): A boolean specifying if the method will wait until a message present or return immediately

          obj: A instance of [`MissionMessage()`][mivp_agent.manager.MissionMessage] or `None` depending on the blocking behavior

            msg = mgr.get_message()

            NAV_X = msg.state['NAV_X']
            NAV_Y = msg.state['NAV_Y']

            # ...
            # Some processing
            # ...

              'speed': 1.0
              'course': 180.0
            return self._msg_queue.get(block=block)
        except Empty:
            return None

    def get_vehicle_count(self):
          int: The amount of vehicles that have connected to this instance of `MissionManager`
        return self._vehicle_count

    def episode_state(self, vname):
        This is used to interrogate the state of a connected vehicle's `pEpisodeManager`

          vname (str): the vname of the vehicle

          str: The state of the `pEpisodeManager` on the vehicle 
        with self._ems_lock:
            # Should be all strings so no reference odd ness
            return self._episode_manager_states[vname]

    def episode_nums(self):
          dict: A key, value pair maping vnames to the episode numbers of the `pEpisodeManager` app on that vehicle
        with self._emn_lock:
            return self._episode_manager_nums.copy()

    def reset_vehicle(self, vname, success=False):
        # Untested
        self._vresets.append((vname, success))

    def close(self):
        if self._thread is not None:
            self._stop_signal = True
        if self._log:
            for vehicle in self._logs:

    def __exit__(self, exc_type, exc_value, traceback):

__init__(self, task, log=True, immediate_transition=True, log_whitelist=None, id_suffix=None, output_dir=None) special

The initializer for MissionManager


Name Type Description Default
task str

For organization of saved data type is required to specify what task the MissionManager is preforming. For example a MissionManager('trainer') will log data under generated_files/trainer/ in the current working directory.

log bool

Logging of agent transitions can be disabled by setting this to False.

immediate_transition bool

By default the the manager will assume that all messages received from BHV_Agents represent a new transition. If set to False one must manually tell set msg.is_transition = True on any objects returned from get_message(). This is helpful when you want to control what is considered a "state" in your Markov Decsion Process.

log_whitelist list

Setting this parameter will only log some transitions according to their reported vnames.

id_suffix str

Will be appended to the generated session id.

output_dir str

Path to a place to store files.

Source code in mivp_agent/
def __init__(self, task, log=True, immediate_transition=True, log_whitelist=None, id_suffix=None, output_dir=None):
    The initializer for MissionManager

        task (str): For organization of saved data type is required to specify what task the MissionManager is preforming. For example a `MissionManager('trainer')` will log data under `generated_files/trainer/` in the current working directory.

        log (bool): Logging of agent transitions can be disabled by setting this to `False`.

        immediate_transition (bool): By default the the manager will assume that all messages received from BHV_Agents represent a new transition. If set to `False` one must manually tell set `msg.is_transition = True` on any objects returned from `get_message()`. This is helpful when you want to control what is considered a "state" in your Markov Decsion Process.

        log_whitelist (list): Setting this parameter will only log some transitions according to their reported `vnames`.

        id_suffix (str): Will be appended to the generated session id.

        output_dir (str): Path to a place to store files.
    self._msg_queue = Queue()

    self._vnames = []
    self._vname_lock = Lock()
    self._vehicle_count = 0
    self._episode_manager_states = {}
    self._ems_lock = Lock()
    self._episode_manager_nums = {}
    self._emn_lock = Lock()

    # Dict to hold queues of vnames to reset
    self._vresets = Queue()

    self._thread = None
    self._stop_signal = False

    if output_dir is None:
        output_dir = os.path.join(

    self._log_dir = LogDirectory(output_dir)
    self._id = self._init_session(id_suffix)

    # Calculate the path to the directories we will be writing to. This will be created when we first use them / return them to the user.
    self._model_path = os.path.join(
    self._log_path = os.path.join(
    self._model_path = os.path.abspath(self._model_path)
    self._log_path = os.path.abspath(self._log_path)

    self._log = log
    self._imm_transition = immediate_transition
    if self._log:
        self._log_whitelist = log_whitelist
        # Create data structs needed to log data from each vehicle
        self._logs = {}
        self._last_state = {}
        self._last_act = {}

        # Go ahead and create the log path

are_present(self, vnames)

Used to see if a specified list of vehicles has connected to the MissionManager instance yet.

See also: wait_for()


Name Type Description Default
vnames iterable

A list / tuple of str values to look for

Source code in mivp_agent/
def are_present(self, vnames):
    Used to see if a specified list of vehicles has connected to the `MissionManager` instance yet.

    See also: [`wait_for()`][mivp_agent.manager.MissionManager.wait_for]

      vnames (iterable): A list / tuple of `str` values to look for
    for vname in vnames:
        with self._vname_lock:
            if vname not in self._vnames:
                return False
    return True



Type Description

A key, value pair maping vnames to the episode numbers of the pEpisodeManager app on that vehicle

Source code in mivp_agent/
def episode_nums(self):
      dict: A key, value pair maping vnames to the episode numbers of the `pEpisodeManager` app on that vehicle
    with self._emn_lock:
        return self._episode_manager_nums.copy()

episode_state(self, vname)

This is used to interrogate the state of a connected vehicle's pEpisodeManager


Name Type Description Default
vname str

the vname of the vehicle



Type Description

The state of the pEpisodeManager on the vehicle

Source code in mivp_agent/
def episode_state(self, vname):
    This is used to interrogate the state of a connected vehicle's `pEpisodeManager`

      vname (str): the vname of the vehicle

      str: The state of the `pEpisodeManager` on the vehicle 
    with self._ems_lock:
        # Should be all strings so no reference odd ness
        return self._episode_manager_states[vname]

get_message(self, block=True)

Used as the primary method for receiving data from BHV_Agent.

NOTE: Messages MUST be responded to as BHV_Agent will not send another update until it has a response to the last.


Name Type Description Default
block bool

A boolean specifying if the method will wait until a message present or return immediately



Type Description

A instance of MissionMessage() or None depending on the blocking behavior


  msg = mgr.get_message()

  NAV_X = msg.state['NAV_X']
  NAV_Y = msg.state['NAV_Y']

  # ...
  # Some processing
  # ...

    'speed': 1.0
    'course': 180.0
Source code in mivp_agent/
def get_message(self, block=True):
    Used as the primary method for receiving data from `BHV_Agent`.

    **NOTE:** Messages **MUST** be responded to as `BHV_Agent` will not send another update until it has a response to the last.

      block (bool): A boolean specifying if the method will wait until a message present or return immediately

      obj: A instance of [`MissionMessage()`][mivp_agent.manager.MissionMessage] or `None` depending on the blocking behavior

        msg = mgr.get_message()

        NAV_X = msg.state['NAV_X']
        NAV_Y = msg.state['NAV_Y']

        # ...
        # Some processing
        # ...

          'speed': 1.0
          'course': 180.0
        return self._msg_queue.get(block=block)
    except Empty:
        return None



Type Description

The amount of vehicles that have connected to this instance of MissionManager

Source code in mivp_agent/
def get_vehicle_count(self):
      int: The amount of vehicles that have connected to this instance of `MissionManager`
    return self._vehicle_count


Returns the log path for the current session so custom files can be added to it.

Source code in mivp_agent/
def log_output_dir(self):
    Returns the log path for the current session so custom files can be added to it.
    assert self._log, "This method should not be used, when logging is disabled"
    return self._log_path


It is not recommended to use this method directly. Instead, consider using this class with the python context manager. This method starts a thread to read from the ModelBridgeServer.


Type Description

False if thread has already been started, True otherwise

Source code in mivp_agent/
def start(self):
    It is **not recommended** to use this method directly. Instead, consider using this class with the python context manager. This method starts a thread to read from the `ModelBridgeServer`.

      bool: False if thread has already been started, True otherwise
    if self._thread is not None:
        return False

    self._thread = Thread(target=self._server_thread, daemon=True)

    return True

wait_for(self, vnames, sleep=0.1)

Used to block until a specified list of vehicles has connect to the MissionManager instance.


Name Type Description Default
vnames iterable

A list / tuple of str values to look for

sleep float

Amount of time in seconds to sleep for between checks

Source code in mivp_agent/
def wait_for(self, vnames, sleep=0.1):
    Used to block until a specified list of vehicles has connect to the `MissionManager` instance.

      vnames (iterable): A list / tuple of `str` values to look for
      sleep (float): Amount of time in seconds to sleep for between checks
    while not self.are_present(vnames):