Skip to content

Module wtracker.sim.sim_controllers.logging_controller

View Source
from collections import deque

import numpy as np

from dataclasses import dataclass, field

from copy import deepcopy

from wtracker.sim.simulator import Simulator, SimController

from wtracker.utils.io_utils import ImageSaver, FrameSaver

from wtracker.utils.log_utils import CSVLogger

from wtracker.utils.config_base import ConfigBase

from wtracker.utils.path_utils import join_paths, create_parent_directory

from wtracker.utils.bbox_utils import BoxUtils, BoxFormat

@dataclass

class LogConfig(ConfigBase):

    root_folder: str

    """The directory where the logs will be saved into."""

    save_mic_view: bool = False

    """Whether to save the microscope view of each frame."""

    save_cam_view: bool = False

    """Whether to save the camera view of each frame."""

    save_err_view: bool = True

    """Whether to camera view of frames in which no prediction was made."""

    save_wrm_view: bool = False

    """whether to save the detected worm head of each frame."""

    mic_folder_name: str = "micro"

    cam_folder_name: str = "camera"

    err_folder_name: str = "errors"

    wrm_folder_name: str = "worms"

    # TODO: WHY DO WE SAVE IN PNG FORMAT AND NOT BMP?

    bbox_file_name: str = "bboxes.csv"

    mic_file_name: str = "mic_{:09d}.png"

    cam_file_name: str = "cam_{:09d}.png"

    wrm_file_name: str = "wrm_{:09d}.png"

    mic_file_path: str = field(init=False)

    cam_file_path: str = field(init=False)

    err_file_path: str = field(init=False)

    wrm_file_path: str = field(init=False)

    bbox_file_path: str = field(init=False)

    def __post_init__(self):

        self.mic_file_path = join_paths(self.root_folder, self.mic_folder_name, self.mic_file_name)

        self.cam_file_path = join_paths(self.root_folder, self.cam_folder_name, self.cam_file_name)

        self.err_file_path = join_paths(self.root_folder, self.err_folder_name, self.cam_file_name)

        self.wrm_file_path = join_paths(self.root_folder, self.wrm_folder_name, self.wrm_file_name)

        self.bbox_file_path = join_paths(self.root_folder, self.bbox_file_name)

    def create_dirs(self) -> None:

        create_parent_directory(self.bbox_file_path)

        create_parent_directory(self.mic_file_path)

        create_parent_directory(self.cam_file_path)

        create_parent_directory(self.err_file_path)

        create_parent_directory(self.wrm_file_path)

class LoggingController(SimController):

    def __init__(

        self,

        sim_controller: SimController,

        log_config: LogConfig,

    ):

        super().__init__(sim_controller.timing_config)

        self.sim_controller = sim_controller

        self.log_config = log_config

        self._camera_frames = deque(maxlen=self.timing_config.cycle_frame_num)

        self._platform_positions = deque(maxlen=self.timing_config.cycle_frame_num)

        self._camera_bboxes = deque(maxlen=self.timing_config.cycle_frame_num)

        self._micro_bboxes = deque(maxlen=self.timing_config.cycle_frame_num)

    def on_sim_start(self, sim: Simulator):

        self.sim_controller.on_sim_start(sim)

        self._camera_frames.clear()

        self._platform_positions.clear()

        self._camera_bboxes.clear()

        self._micro_bboxes.clear()

        self.log_config.create_dirs()

        self._image_saver = ImageSaver(tqdm=True)

        self._image_saver.start()

        self._frame_saver = FrameSaver(deepcopy(sim.view._frame_reader), tqdm=True)

        self._frame_saver.start()

        self._bbox_logger = CSVLogger(

            self.log_config.bbox_file_path,

            col_names=[

                "frame",

                "cycle",

                "phase",

                "plt_x",

                "plt_y",

                "cam_x",

                "cam_y",

                "cam_w",

                "cam_h",

                "mic_x",

                "mic_y",

                "mic_w",

                "mic_h",

                "wrm_x",

                "wrm_y",

                "wrm_w",

                "wrm_h",

            ],

        )

    def on_cycle_start(self, sim: Simulator):

        self.sim_controller.on_cycle_start(sim)

    def on_camera_frame(self, sim: Simulator):

        self.sim_controller.on_camera_frame(sim)

        # log everything

        self._platform_positions.append(sim.position)

        self._camera_bboxes.append(sim.view.camera_position)

        self._micro_bboxes.append(sim.view.micro_position)

        if self.log_config.save_err_view:

            cam_view = sim.camera_view()

            self._camera_frames.append(cam_view)

        if self.log_config.save_cam_view:

            # save camera view

            cam_view = sim.camera_view()

            path = self.log_config.cam_file_path.format(sim.frame_number)

            self._image_saver.schedule_save(cam_view, path)

        if self.log_config.save_mic_view:

            # save micro view

            mic_view = sim.view.micro_view()

            path = self.log_config.mic_file_path.format(sim.frame_number)

            self._image_saver.schedule_save(mic_view, path)

    def _log_cycle(self, sim: Simulator):

        cycle_number = sim.cycle_number - 1

        frame_offset = cycle_number * self.timing_config.cycle_frame_num

        worm_bboxes = self.sim_controller._cycle_predict_all(sim)

        cam_bboxes = np.asanyarray(list(self._camera_bboxes))

        # make worm bboxes coordinate absolute

        worm_bboxes[:, 0] += cam_bboxes[:, 0]

        worm_bboxes[:, 1] += cam_bboxes[:, 1]

        # calc the crop dims to get the worm view from the original frame

        (H, W) = sim.experiment_config.orig_resolution

        crop_dims, is_crop_legal = BoxUtils.discretize(worm_bboxes, (H, W), BoxFormat.XYWH)

        for i, worm_bbox in enumerate(worm_bboxes):

            frame_number = frame_offset + i

            # if no prediction and we're saving error frames

            if not np.isfinite(worm_bbox).all() and self.log_config.save_err_view:

                err_view = self._camera_frames[i]

                path = self.log_config.err_file_path.format(frame_number)

                self._image_saver.schedule_save(img=err_view, img_name=path)

            # save cropped worm view if crop is legal

            if self.log_config.save_wrm_view and is_crop_legal[i]:

                crop_dim = crop_dims[i]

                path = self.log_config.wrm_file_path.format(frame_number)

                self._frame_saver.schedule_save(img_index=frame_number, crop_dims=crop_dim, img_name=path)

            csv_row = {}

            csv_row["plt_x"], csv_row["plt_y"] = self._platform_positions[i]

            csv_row["cam_x"], csv_row["cam_y"], csv_row["cam_w"], csv_row["cam_h"] = self._camera_bboxes[i]

            csv_row["mic_x"], csv_row["mic_y"], csv_row["mic_w"], csv_row["mic_h"] = self._micro_bboxes[i]

            csv_row["cycle"] = cycle_number

            csv_row["frame"] = frame_number

            csv_row["phase"] = "imaging" if i < self.timing_config.imaging_frame_num else "moving"

            csv_row["wrm_x"], csv_row["wrm_y"], csv_row["wrm_w"], csv_row["wrm_h"] = worm_bbox

            self._bbox_logger.write(csv_row)

        self._bbox_logger.flush()

    def on_cycle_end(self, sim: Simulator):

        self._log_cycle(sim)

        self.sim_controller.on_cycle_end(sim)

        self._camera_frames.clear()

        self._platform_positions.clear()

        self._camera_bboxes.clear()

        self._micro_bboxes.clear()

    def on_sim_end(self, sim: Simulator):

        self.sim_controller.on_sim_end(sim)

        self._image_saver.close()

        self._frame_saver.close()

        self._bbox_logger.close()

    def on_imaging_start(self, sim: Simulator):

        self.sim_controller.on_imaging_start(sim)

    def on_micro_frame(self, sim: Simulator):

        self.sim_controller.on_micro_frame(sim)

    def on_imaging_end(self, sim: Simulator):

        self.sim_controller.on_imaging_end(sim)

    def on_movement_start(self, sim: Simulator):

        self.sim_controller.on_movement_start(sim)

    def on_movement_end(self, sim: Simulator):

        self.sim_controller.on_movement_end(sim)

    def begin_movement_prediction(self, sim: Simulator) -> None:

        return self.sim_controller.begin_movement_prediction(sim)

    def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:

        return self.sim_controller.provide_movement_vector(sim)

    def _cycle_predict_all(self, sim: Simulator) -> np.ndarray:

        return self.sim_controller._cycle_predict_all(sim)

Classes

LogConfig

class LogConfig(
    root_folder: str,
    save_mic_view: bool = False,
    save_cam_view: bool = False,
    save_err_view: bool = True,
    save_wrm_view: bool = False,
    mic_folder_name: str = 'micro',
    cam_folder_name: str = 'camera',
    err_folder_name: str = 'errors',
    wrm_folder_name: str = 'worms',
    bbox_file_name: str = 'bboxes.csv',
    mic_file_name: str = 'mic_{:09d}.png',
    cam_file_name: str = 'cam_{:09d}.png',
    wrm_file_name: str = 'wrm_{:09d}.png'
)

LogConfig(root_folder: str, save_mic_view: bool = False, save_cam_view: bool = False, save_err_view: bool = True, save_wrm_view: bool = False, mic_folder_name: str = 'micro', cam_folder_name: str = 'camera', err_folder_name: str = 'errors', wrm_folder_name: str = 'worms', bbox_file_name: str = 'bboxes.csv', mic_file_name: str = 'mic_{:09d}.png', cam_file_name: str = 'cam_{:09d}.png', wrm_file_name: str = 'wrm_{:09d}.png')

View Source
@dataclass

class LogConfig(ConfigBase):

    root_folder: str

    """The directory where the logs will be saved into."""

    save_mic_view: bool = False

    """Whether to save the microscope view of each frame."""

    save_cam_view: bool = False

    """Whether to save the camera view of each frame."""

    save_err_view: bool = True

    """Whether to camera view of frames in which no prediction was made."""

    save_wrm_view: bool = False

    """whether to save the detected worm head of each frame."""

    mic_folder_name: str = "micro"

    cam_folder_name: str = "camera"

    err_folder_name: str = "errors"

    wrm_folder_name: str = "worms"

    # TODO: WHY DO WE SAVE IN PNG FORMAT AND NOT BMP?

    bbox_file_name: str = "bboxes.csv"

    mic_file_name: str = "mic_{:09d}.png"

    cam_file_name: str = "cam_{:09d}.png"

    wrm_file_name: str = "wrm_{:09d}.png"

    mic_file_path: str = field(init=False)

    cam_file_path: str = field(init=False)

    err_file_path: str = field(init=False)

    wrm_file_path: str = field(init=False)

    bbox_file_path: str = field(init=False)

    def __post_init__(self):

        self.mic_file_path = join_paths(self.root_folder, self.mic_folder_name, self.mic_file_name)

        self.cam_file_path = join_paths(self.root_folder, self.cam_folder_name, self.cam_file_name)

        self.err_file_path = join_paths(self.root_folder, self.err_folder_name, self.cam_file_name)

        self.wrm_file_path = join_paths(self.root_folder, self.wrm_folder_name, self.wrm_file_name)

        self.bbox_file_path = join_paths(self.root_folder, self.bbox_file_name)

    def create_dirs(self) -> None:

        create_parent_directory(self.bbox_file_path)

        create_parent_directory(self.mic_file_path)

        create_parent_directory(self.cam_file_path)

        create_parent_directory(self.err_file_path)

        create_parent_directory(self.wrm_file_path)

Ancestors (in MRO)

  • wtracker.utils.config_base.ConfigBase

Class variables

bbox_file_name
cam_file_name
cam_folder_name
err_folder_name
mic_file_name
mic_folder_name
save_cam_view
save_err_view
save_mic_view
save_wrm_view
wrm_file_name
wrm_folder_name

Static methods

load_json

def load_json(
    path: 'str' = None
) -> 'T'

Load the class from a JSON file.

Parameters:

Name Type Description Default
path str The path to the JSON file. None

Returns:

Type Description
ConfigBase The class loaded from the JSON file.
View Source
    @classmethod

    def load_json(cls: type[T], path: str = None) -> T:

        """

        Load the class from a JSON file.

        Args:

            path (str): The path to the JSON file.

        Returns:

            ConfigBase: The class loaded from the JSON file.

        """

        if path is None:

            path = UserPrompt.open_file(

                title=f"Open {cls.__name__} File",

                file_types=[("json", ".json")],

            )

        with open(path, "r") as f:

            data = json.load(f)

        obj = cls.__new__(cls)

        obj.__dict__.update(data)

        return obj

load_pickle

def load_pickle(
    path: 'str' = None
) -> 'T'

Load the class from a pickle file.

Parameters:

Name Type Description Default
path str The path to the pickle file. None

Returns:

Type Description
None The class loaded from the pickle file.
View Source
    @classmethod

    def load_pickle(cls: type[T], path: str = None) -> T:

        """

        Load the class from a pickle file.

        Args:

            path (str): The path to the pickle file.

        Returns:

            The class loaded from the pickle file.

        """

        if path is None:

            path = UserPrompt.open_file(

                title=f"Open {cls.__name__} File",

                file_types=[("pickle", ".pkl")],

            )

        return pickle_load_object(path)

Methods

create_dirs

def create_dirs(
    self
) -> None
View Source
    def create_dirs(self) -> None:

        create_parent_directory(self.bbox_file_path)

        create_parent_directory(self.mic_file_path)

        create_parent_directory(self.cam_file_path)

        create_parent_directory(self.err_file_path)

        create_parent_directory(self.wrm_file_path)

save_json

def save_json(
    self,
    path: 'str' = None
)

Saves the class as JSON file.

Parameters:

Name Type Description Default
path str The path to the output JSON file. None
View Source
    def save_json(self, path: str = None):

        """

        Saves the class as JSON file.

        Args:

            path (str): The path to the output JSON file.

        """

        if path is None:

            path = UserPrompt.save_file(

                title=f"Save {type(self).__name__} As",

                file_types=[("json", ".json")],

                defaultextension=".json",

            )

        with open(path, "w") as f:

            json.dump(self.__dict__, f, indent=4)

save_pickle

def save_pickle(
    self,
    path: 'str' = None
) -> 'None'

Saves the class as a pickle file.

Parameters:

Name Type Description Default
path str The path to the output pickle file. None
View Source
    def save_pickle(self, path: str = None) -> None:

        """

        Saves the class as a pickle file.

        Args:

            path (str): The path to the output pickle file.

        """

        if path is None:

            path = UserPrompt.save_file(

                title=f"Save {type(self).__name__} As",

                file_types=[("pickle", ".pkl")],

                defaultextension=".pkl",

            )

        pickle_save_object(self, path)

LoggingController

class LoggingController(
    sim_controller: wtracker.sim.simulator.SimController,
    log_config: wtracker.sim.sim_controllers.logging_controller.LogConfig
)

Abstract base class for simulator controllers.

Attributes

Name Type Description Default
timing_config TimingConfig The timing configuration for the simulator. None
View Source
class LoggingController(SimController):

    def __init__(

        self,

        sim_controller: SimController,

        log_config: LogConfig,

    ):

        super().__init__(sim_controller.timing_config)

        self.sim_controller = sim_controller

        self.log_config = log_config

        self._camera_frames = deque(maxlen=self.timing_config.cycle_frame_num)

        self._platform_positions = deque(maxlen=self.timing_config.cycle_frame_num)

        self._camera_bboxes = deque(maxlen=self.timing_config.cycle_frame_num)

        self._micro_bboxes = deque(maxlen=self.timing_config.cycle_frame_num)

    def on_sim_start(self, sim: Simulator):

        self.sim_controller.on_sim_start(sim)

        self._camera_frames.clear()

        self._platform_positions.clear()

        self._camera_bboxes.clear()

        self._micro_bboxes.clear()

        self.log_config.create_dirs()

        self._image_saver = ImageSaver(tqdm=True)

        self._image_saver.start()

        self._frame_saver = FrameSaver(deepcopy(sim.view._frame_reader), tqdm=True)

        self._frame_saver.start()

        self._bbox_logger = CSVLogger(

            self.log_config.bbox_file_path,

            col_names=[

                "frame",

                "cycle",

                "phase",

                "plt_x",

                "plt_y",

                "cam_x",

                "cam_y",

                "cam_w",

                "cam_h",

                "mic_x",

                "mic_y",

                "mic_w",

                "mic_h",

                "wrm_x",

                "wrm_y",

                "wrm_w",

                "wrm_h",

            ],

        )

    def on_cycle_start(self, sim: Simulator):

        self.sim_controller.on_cycle_start(sim)

    def on_camera_frame(self, sim: Simulator):

        self.sim_controller.on_camera_frame(sim)

        # log everything

        self._platform_positions.append(sim.position)

        self._camera_bboxes.append(sim.view.camera_position)

        self._micro_bboxes.append(sim.view.micro_position)

        if self.log_config.save_err_view:

            cam_view = sim.camera_view()

            self._camera_frames.append(cam_view)

        if self.log_config.save_cam_view:

            # save camera view

            cam_view = sim.camera_view()

            path = self.log_config.cam_file_path.format(sim.frame_number)

            self._image_saver.schedule_save(cam_view, path)

        if self.log_config.save_mic_view:

            # save micro view

            mic_view = sim.view.micro_view()

            path = self.log_config.mic_file_path.format(sim.frame_number)

            self._image_saver.schedule_save(mic_view, path)

    def _log_cycle(self, sim: Simulator):

        cycle_number = sim.cycle_number - 1

        frame_offset = cycle_number * self.timing_config.cycle_frame_num

        worm_bboxes = self.sim_controller._cycle_predict_all(sim)

        cam_bboxes = np.asanyarray(list(self._camera_bboxes))

        # make worm bboxes coordinate absolute

        worm_bboxes[:, 0] += cam_bboxes[:, 0]

        worm_bboxes[:, 1] += cam_bboxes[:, 1]

        # calc the crop dims to get the worm view from the original frame

        (H, W) = sim.experiment_config.orig_resolution

        crop_dims, is_crop_legal = BoxUtils.discretize(worm_bboxes, (H, W), BoxFormat.XYWH)

        for i, worm_bbox in enumerate(worm_bboxes):

            frame_number = frame_offset + i

            # if no prediction and we're saving error frames

            if not np.isfinite(worm_bbox).all() and self.log_config.save_err_view:

                err_view = self._camera_frames[i]

                path = self.log_config.err_file_path.format(frame_number)

                self._image_saver.schedule_save(img=err_view, img_name=path)

            # save cropped worm view if crop is legal

            if self.log_config.save_wrm_view and is_crop_legal[i]:

                crop_dim = crop_dims[i]

                path = self.log_config.wrm_file_path.format(frame_number)

                self._frame_saver.schedule_save(img_index=frame_number, crop_dims=crop_dim, img_name=path)

            csv_row = {}

            csv_row["plt_x"], csv_row["plt_y"] = self._platform_positions[i]

            csv_row["cam_x"], csv_row["cam_y"], csv_row["cam_w"], csv_row["cam_h"] = self._camera_bboxes[i]

            csv_row["mic_x"], csv_row["mic_y"], csv_row["mic_w"], csv_row["mic_h"] = self._micro_bboxes[i]

            csv_row["cycle"] = cycle_number

            csv_row["frame"] = frame_number

            csv_row["phase"] = "imaging" if i < self.timing_config.imaging_frame_num else "moving"

            csv_row["wrm_x"], csv_row["wrm_y"], csv_row["wrm_w"], csv_row["wrm_h"] = worm_bbox

            self._bbox_logger.write(csv_row)

        self._bbox_logger.flush()

    def on_cycle_end(self, sim: Simulator):

        self._log_cycle(sim)

        self.sim_controller.on_cycle_end(sim)

        self._camera_frames.clear()

        self._platform_positions.clear()

        self._camera_bboxes.clear()

        self._micro_bboxes.clear()

    def on_sim_end(self, sim: Simulator):

        self.sim_controller.on_sim_end(sim)

        self._image_saver.close()

        self._frame_saver.close()

        self._bbox_logger.close()

    def on_imaging_start(self, sim: Simulator):

        self.sim_controller.on_imaging_start(sim)

    def on_micro_frame(self, sim: Simulator):

        self.sim_controller.on_micro_frame(sim)

    def on_imaging_end(self, sim: Simulator):

        self.sim_controller.on_imaging_end(sim)

    def on_movement_start(self, sim: Simulator):

        self.sim_controller.on_movement_start(sim)

    def on_movement_end(self, sim: Simulator):

        self.sim_controller.on_movement_end(sim)

    def begin_movement_prediction(self, sim: Simulator) -> None:

        return self.sim_controller.begin_movement_prediction(sim)

    def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:

        return self.sim_controller.provide_movement_vector(sim)

    def _cycle_predict_all(self, sim: Simulator) -> np.ndarray:

        return self.sim_controller._cycle_predict_all(sim)

Ancestors (in MRO)

  • wtracker.sim.simulator.SimController
  • abc.ABC

Methods

begin_movement_prediction

def begin_movement_prediction(
    self,
    sim: wtracker.sim.simulator.Simulator
) -> None

Called when the movement prediction begins.

View Source
    def begin_movement_prediction(self, sim: Simulator) -> None:

        return self.sim_controller.begin_movement_prediction(sim)

on_camera_frame

def on_camera_frame(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when a camera frame is captured. Happens every frame.

View Source
    def on_camera_frame(self, sim: Simulator):

        self.sim_controller.on_camera_frame(sim)

        # log everything

        self._platform_positions.append(sim.position)

        self._camera_bboxes.append(sim.view.camera_position)

        self._micro_bboxes.append(sim.view.micro_position)

        if self.log_config.save_err_view:

            cam_view = sim.camera_view()

            self._camera_frames.append(cam_view)

        if self.log_config.save_cam_view:

            # save camera view

            cam_view = sim.camera_view()

            path = self.log_config.cam_file_path.format(sim.frame_number)

            self._image_saver.schedule_save(cam_view, path)

        if self.log_config.save_mic_view:

            # save micro view

            mic_view = sim.view.micro_view()

            path = self.log_config.mic_file_path.format(sim.frame_number)

            self._image_saver.schedule_save(mic_view, path)

on_cycle_end

def on_cycle_end(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when a cycle ends.

View Source
    def on_cycle_end(self, sim: Simulator):

        self._log_cycle(sim)

        self.sim_controller.on_cycle_end(sim)

        self._camera_frames.clear()

        self._platform_positions.clear()

        self._camera_bboxes.clear()

        self._micro_bboxes.clear()

on_cycle_start

def on_cycle_start(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when a new cycle starts.

View Source
    def on_cycle_start(self, sim: Simulator):

        self.sim_controller.on_cycle_start(sim)

on_imaging_end

def on_imaging_end(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when imaging phase ends.

View Source
    def on_imaging_end(self, sim: Simulator):

        self.sim_controller.on_imaging_end(sim)

on_imaging_start

def on_imaging_start(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when imaging phase starts.

View Source
    def on_imaging_start(self, sim: Simulator):

        self.sim_controller.on_imaging_start(sim)

on_micro_frame

def on_micro_frame(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when a micro frame is captured. Happens for every during the imaging phase.

View Source
    def on_micro_frame(self, sim: Simulator):

        self.sim_controller.on_micro_frame(sim)

on_movement_end

def on_movement_end(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when movement phase ends.

View Source
    def on_movement_end(self, sim: Simulator):

        self.sim_controller.on_movement_end(sim)

on_movement_start

def on_movement_start(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when movement phase starts.

View Source
    def on_movement_start(self, sim: Simulator):

        self.sim_controller.on_movement_start(sim)

on_sim_end

def on_sim_end(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when the simulation ends.

View Source
    def on_sim_end(self, sim: Simulator):

        self.sim_controller.on_sim_end(sim)

        self._image_saver.close()

        self._frame_saver.close()

        self._bbox_logger.close()

on_sim_start

def on_sim_start(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when the simulation starts.

View Source
    def on_sim_start(self, sim: Simulator):

        self.sim_controller.on_sim_start(sim)

        self._camera_frames.clear()

        self._platform_positions.clear()

        self._camera_bboxes.clear()

        self._micro_bboxes.clear()

        self.log_config.create_dirs()

        self._image_saver = ImageSaver(tqdm=True)

        self._image_saver.start()

        self._frame_saver = FrameSaver(deepcopy(sim.view._frame_reader), tqdm=True)

        self._frame_saver.start()

        self._bbox_logger = CSVLogger(

            self.log_config.bbox_file_path,

            col_names=[

                "frame",

                "cycle",

                "phase",

                "plt_x",

                "plt_y",

                "cam_x",

                "cam_y",

                "cam_w",

                "cam_h",

                "mic_x",

                "mic_y",

                "mic_w",

                "mic_h",

                "wrm_x",

                "wrm_y",

                "wrm_w",

                "wrm_h",

            ],

        )

provide_movement_vector

def provide_movement_vector(
    self,
    sim: wtracker.sim.simulator.Simulator
) -> tuple[int, int]

Provides the movement vector for the simulator. The platform is moved by the provided vector.

Returns:

Type Description
tuple[int, int] The movement vector in format (dx, dy).
The platform will be moved by dx pixels in the x-direction and dy pixels in the y-direction.
View Source
    def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:

        return self.sim_controller.provide_movement_vector(sim)