Skip to content

Module wtracker.sim.sim_controllers.optimal_controller

View Source
import numpy as np

from wtracker.sim.config import TimingConfig

from wtracker.sim.simulator import Simulator

from wtracker.sim.sim_controllers.csv_controller import CsvController

class OptimalController(CsvController):

    def __init__(self, timing_config: TimingConfig, csv_path: str):

        super().__init__(timing_config, csv_path)

        self._csv_centers = np.empty((len(self._csv_data), 2), dtype=self._csv_data.dtype)

        self._csv_centers[:, 0] = self._csv_data[:, 0] + self._csv_data[:, 2] / 2

        self._csv_centers[:, 1] = self._csv_data[:, 1] + self._csv_data[:, 3] / 2

    def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:

        # extract portion matching next imaging phase

        next_imaging_start = (sim.cycle_number + 1) * self.timing_config.cycle_frame_num

        next_imaging_end = next_imaging_start + self.timing_config.imaging_frame_num

        next_imaging = self._csv_centers[next_imaging_start:next_imaging_end, :]

        next_imaging = next_imaging[np.isfinite(next_imaging).all(axis=1)]

        if len(next_imaging) == 0:

            return 0, 0

        x_next, y_next = np.median(next_imaging, axis=0)

        cam_x, cam_y, cam_w, cam_h = sim.view.camera_position

        cam_mid = cam_x + cam_w / 2, cam_y + cam_h / 2

        return round(x_next - cam_mid[0]), round(y_next - cam_mid[1])

Classes

OptimalController

class OptimalController(
    timing_config: wtracker.sim.config.TimingConfig,
    csv_path: str
)

Abstract base class for simulator controllers.

Attributes

Name Type Description Default
timing_config TimingConfig The timing configuration for the simulator. None
View Source
class OptimalController(CsvController):

    def __init__(self, timing_config: TimingConfig, csv_path: str):

        super().__init__(timing_config, csv_path)

        self._csv_centers = np.empty((len(self._csv_data), 2), dtype=self._csv_data.dtype)

        self._csv_centers[:, 0] = self._csv_data[:, 0] + self._csv_data[:, 2] / 2

        self._csv_centers[:, 1] = self._csv_data[:, 1] + self._csv_data[:, 3] / 2

    def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:

        # extract portion matching next imaging phase

        next_imaging_start = (sim.cycle_number + 1) * self.timing_config.cycle_frame_num

        next_imaging_end = next_imaging_start + self.timing_config.imaging_frame_num

        next_imaging = self._csv_centers[next_imaging_start:next_imaging_end, :]

        next_imaging = next_imaging[np.isfinite(next_imaging).all(axis=1)]

        if len(next_imaging) == 0:

            return 0, 0

        x_next, y_next = np.median(next_imaging, axis=0)

        cam_x, cam_y, cam_w, cam_h = sim.view.camera_position

        cam_mid = cam_x + cam_w / 2, cam_y + cam_h / 2

        return round(x_next - cam_mid[0]), round(y_next - cam_mid[1])

Ancestors (in MRO)

  • wtracker.sim.sim_controllers.csv_controller.CsvController
  • wtracker.sim.simulator.SimController
  • abc.ABC

Methods

begin_movement_prediction

def begin_movement_prediction(
    self,
    sim: wtracker.sim.simulator.Simulator
) -> None

Called when the movement prediction begins.

View Source
    def begin_movement_prediction(self, sim: Simulator) -> None:

        pass

on_camera_frame

def on_camera_frame(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when a camera frame is captured. Happens every frame.

View Source
    def on_camera_frame(self, sim: Simulator):

        self._camera_bboxes.append(sim.view.camera_position)

on_cycle_end

def on_cycle_end(
    self,
    sim: 'Simulator'
)

Called when a cycle ends.

View Source
    def on_cycle_end(self, sim: Simulator):

        """

        Called when a cycle ends.

        """

        pass

on_cycle_start

def on_cycle_start(
    self,
    sim: 'Simulator'
)

Called when a new cycle starts.

View Source
    def on_cycle_start(self, sim: Simulator):

        """

        Called when a new cycle starts.

        """

        pass

on_imaging_end

def on_imaging_end(
    self,
    sim: 'Simulator'
)

Called when imaging phase ends.

View Source
    def on_imaging_end(self, sim: Simulator):

        """

        Called when imaging phase ends.

        """

        pass

on_imaging_start

def on_imaging_start(
    self,
    sim: 'Simulator'
)

Called when imaging phase starts.

View Source
    def on_imaging_start(self, sim: Simulator):

        """

        Called when imaging phase starts.

        """

        pass

on_micro_frame

def on_micro_frame(
    self,
    sim: 'Simulator'
)

Called when a micro frame is captured. Happens for every during the imaging phase.

View Source
    def on_micro_frame(self, sim: Simulator):

        """

        Called when a micro frame is captured. Happens for every during the imaging phase.

        """

        pass

on_movement_end

def on_movement_end(
    self,
    sim: 'Simulator'
)

Called when movement phase ends.

View Source
    def on_movement_end(self, sim: Simulator):

        """

        Called when movement phase ends.

        """

        pass

on_movement_start

def on_movement_start(
    self,
    sim: 'Simulator'
)

Called when movement phase starts.

View Source
    def on_movement_start(self, sim: Simulator):

        """

        Called when movement phase starts.

        """

        pass

on_sim_end

def on_sim_end(
    self,
    sim: 'Simulator'
)

Called when the simulation ends.

View Source
    def on_sim_end(self, sim: Simulator):

        """

        Called when the simulation ends.

        """

        pass

on_sim_start

def on_sim_start(
    self,
    sim: wtracker.sim.simulator.Simulator
)

Called when the simulation starts.

View Source
    def on_sim_start(self, sim: Simulator):

        self._camera_bboxes.clear()

predict

def predict(
    self,
    frame_nums: Collection[int],
    relative: bool = True
) -> numpy.ndarray
View Source
    def predict(self, frame_nums: Collection[int], relative: bool = True) -> np.ndarray:

        assert len(frame_nums) > 0

        frame_nums = np.asanyarray(frame_nums, dtype=int)

        valid_mask = (frame_nums >= 0) & (frame_nums < self._csv_data.shape[0])

        worm_bboxes = np.full((frame_nums.shape[0], 4), np.nan)

        worm_bboxes[valid_mask] = self._csv_data[frame_nums[valid_mask], :]

        if not relative:

            return worm_bboxes

        # TODO: if relative == True then it works only if frame number if within the last cycle.

        # maybe fix that.

        cam_bboxes = [self._camera_bboxes[n % self.timing_config.cycle_frame_num] for n in frame_nums]

        cam_bboxes = np.asanyarray(cam_bboxes, dtype=float)

        # make bbox relative to camera view

        worm_bboxes[:, 0] -= cam_bboxes[:, 0]

        worm_bboxes[:, 1] -= cam_bboxes[:, 1]

        return worm_bboxes

provide_movement_vector

def provide_movement_vector(
    self,
    sim: wtracker.sim.simulator.Simulator
) -> tuple[int, int]

Provides the movement vector for the simulator. The platform is moved by the provided vector.

Returns:

Type Description
tuple[int, int] The movement vector in format (dx, dy).
The platform will be moved by dx pixels in the x-direction and dy pixels in the y-direction.
View Source
    def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:

        # extract portion matching next imaging phase

        next_imaging_start = (sim.cycle_number + 1) * self.timing_config.cycle_frame_num

        next_imaging_end = next_imaging_start + self.timing_config.imaging_frame_num

        next_imaging = self._csv_centers[next_imaging_start:next_imaging_end, :]

        next_imaging = next_imaging[np.isfinite(next_imaging).all(axis=1)]

        if len(next_imaging) == 0:

            return 0, 0

        x_next, y_next = np.median(next_imaging, axis=0)

        cam_x, cam_y, cam_w, cam_h = sim.view.camera_position

        cam_mid = cam_x + cam_w / 2, cam_y + cam_h / 2

        return round(x_next - cam_mid[0]), round(y_next - cam_mid[1])