Module wtracker.sim.simulator
View Source
from __future__ import annotations
import numpy as np
import abc
from tqdm.auto import tqdm
from wtracker.sim.view_controller import ViewController
from wtracker.sim.config import TimingConfig, ExperimentConfig
from wtracker.sim.motor_controllers import MotorController, SineMotorController
from wtracker.utils.frame_reader import FrameReader, DummyReader
class Simulator:
"""
A class representing a simulator for a biological experiment.
Args:
timing_config (TimingConfig): The timing configuration for the experiment.
experiment_config (ExperimentConfig): The experiment configuration.
sim_controller (SimController): The simulation controller.
reader (FrameReader, optional): The frame reader.
motor_controller (MotorController, optional): The motor controller.
Attributes:
timing_config (TimingConfig): The timing configuration for the experiment.
experiment_config (ExperimentConfig): The experiment configuration.
"""
def __init__(
self,
timing_config: TimingConfig,
experiment_config: ExperimentConfig,
sim_controller: SimController,
reader: FrameReader = None,
motor_controller: MotorController = None,
) -> None:
self.timing_config = timing_config
self.experiment_config = experiment_config
self._sim_controller = sim_controller
if reader is None:
num_frames = experiment_config.num_frames
padding_size = (timing_config.camera_size_px[0] // 2 * 2, timing_config.camera_size_px[1] // 2 * 2)
resolution = tuple([sum(x) for x in zip(experiment_config.orig_resolution, padding_size)])
reader = DummyReader(num_frames, resolution, colored=True)
if motor_controller is None:
motor_controller = SineMotorController(timing_config)
self._motor_controller = motor_controller
self._view = ViewController(
frame_reader=reader,
camera_size=timing_config.camera_size_px,
micro_size=timing_config.micro_size_px,
init_position=experiment_config.init_position,
)
@property
def view(self) -> ViewController:
"""
Get the view controller.
Returns:
ViewController: The view controller.
"""
return self._view
@property
def position(self) -> tuple[int, int]:
"""
Get the current position.
Returns:
tuple[int, int]: The current position.
"""
return self._view.position
@property
def cycle_number(self) -> int:
"""
Get the current cycle number.
Returns:
int: The current cycle number.
"""
return self._view.index // self.timing_config.cycle_frame_num
@property
def frame_number(self) -> int:
"""
Get the current frame number.
Returns:
int: The current frame number.
"""
return self._view.index
@property
def cycle_step(self) -> int:
"""
Get the current cycle step.
Returns:
int: The current cycle step.
"""
return self._view.index % self.timing_config.cycle_frame_num
def camera_view(self) -> np.ndarray:
"""
Get the view that the camera sees.
Returns:
np.ndarray: The camera view.
"""
return self._view.camera_view()
def micro_view(self) -> np.ndarray:
"""
Get the view that the microscope sees.
Returns:
np.ndarray: The micro view.
"""
return self._view.micro_view()
def _reset(self):
"""
Reset the simulator.
"""
self.view.reset()
self.view.set_position(*self.experiment_config.init_position)
def run(self, visualize: bool = False, wait_key: bool = False):
"""
Run the simulation.
Args:
visualize (bool, optional): Whether to visualize the simulation.
wait_key (bool, optional): Whether to wait for a key press to advance the simulation during visualization.
"""
config = self.timing_config
total_cycles = len(self._view) // config.cycle_frame_num
pbar = tqdm(total=total_cycles, desc="Simulation Progress", unit="cycle")
self._reset()
self._sim_controller.on_sim_start(self)
while self._view.progress():
if self.cycle_step == 0:
if self.cycle_number > 0:
self._sim_controller.on_movement_end(self)
self._sim_controller.on_cycle_end(self)
self._sim_controller.on_cycle_start(self)
self._sim_controller.on_camera_frame(self)
if self.cycle_step == 0:
self._sim_controller.on_imaging_start(self)
if self.cycle_step < config.imaging_frame_num:
self._sim_controller.on_micro_frame(self)
if self.cycle_step == config.imaging_frame_num - config.pred_frame_num:
self._sim_controller.begin_movement_prediction(self)
if self.cycle_step == config.imaging_frame_num:
self._sim_controller.on_imaging_end(self)
dx, dy = self._sim_controller.provide_movement_vector(self)
self._sim_controller.on_movement_start(self)
self._motor_controller.register_move(dx, dy)
if config.imaging_frame_num <= self.cycle_step < config.imaging_frame_num + config.moving_frame_num:
dx, dy = self._motor_controller.step()
self._view.move_position(dx, dy)
if self.cycle_step == config.cycle_frame_num - 1:
pbar.update(1)
if visualize:
self._view.visualize_world(timeout=0 if wait_key else 1)
self._sim_controller.on_sim_end(self)
pbar.close()
class SimController(abc.ABC):
"""
Abstract base class for simulator controllers.
Attributes:
timing_config (TimingConfig): The timing configuration for the simulator.
"""
def __init__(self, timing_config: TimingConfig):
self.timing_config = timing_config
def on_sim_start(self, sim: Simulator):
"""
Called when the simulation starts.
"""
pass
def on_sim_end(self, sim: Simulator):
"""
Called when the simulation ends.
"""
pass
def on_cycle_start(self, sim: Simulator):
"""
Called when a new cycle starts.
"""
pass
def on_cycle_end(self, sim: Simulator):
"""
Called when a cycle ends.
"""
pass
def on_camera_frame(self, sim: Simulator):
"""
Called when a camera frame is captured. Happens every frame.
"""
pass
def on_imaging_start(self, sim: Simulator):
"""
Called when imaging phase starts.
"""
pass
def on_micro_frame(self, sim: Simulator):
"""
Called when a micro frame is captured. Happens for every during the imaging phase.
"""
pass
def on_imaging_end(self, sim: Simulator):
"""
Called when imaging phase ends.
"""
pass
def on_movement_start(self, sim: Simulator):
"""
Called when movement phase starts.
"""
pass
def on_movement_end(self, sim: Simulator):
"""
Called when movement phase ends.
"""
pass
@abc.abstractmethod
def begin_movement_prediction(self, sim: Simulator) -> None:
"""
Called when the movement prediction begins.
"""
raise NotImplementedError()
@abc.abstractmethod
def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:
"""
Provides the movement vector for the simulator. The platform is moved by the provided vector.
Returns:
tuple[int, int]: The movement vector in format (dx, dy).
The platform will be moved by dx pixels in the x-direction and dy pixels in the y-direction.
"""
raise NotImplementedError()
@abc.abstractmethod
def _cycle_predict_all(self, sim: Simulator) -> np.ndarray:
"""
Returns a list of bbox predictions of the worm, for each frame of the current cycle.
If a prediction is not available, return None for that frame.
Used internally for logging.
"""
raise NotImplementedError()
Classes
SimController
class SimController(
timing_config: 'TimingConfig'
)
Abstract base class for simulator controllers.
Attributes
Name | Type | Description | Default |
---|---|---|---|
timing_config | TimingConfig | The timing configuration for the simulator. | None |
View Source
class SimController(abc.ABC):
"""
Abstract base class for simulator controllers.
Attributes:
timing_config (TimingConfig): The timing configuration for the simulator.
"""
def __init__(self, timing_config: TimingConfig):
self.timing_config = timing_config
def on_sim_start(self, sim: Simulator):
"""
Called when the simulation starts.
"""
pass
def on_sim_end(self, sim: Simulator):
"""
Called when the simulation ends.
"""
pass
def on_cycle_start(self, sim: Simulator):
"""
Called when a new cycle starts.
"""
pass
def on_cycle_end(self, sim: Simulator):
"""
Called when a cycle ends.
"""
pass
def on_camera_frame(self, sim: Simulator):
"""
Called when a camera frame is captured. Happens every frame.
"""
pass
def on_imaging_start(self, sim: Simulator):
"""
Called when imaging phase starts.
"""
pass
def on_micro_frame(self, sim: Simulator):
"""
Called when a micro frame is captured. Happens for every during the imaging phase.
"""
pass
def on_imaging_end(self, sim: Simulator):
"""
Called when imaging phase ends.
"""
pass
def on_movement_start(self, sim: Simulator):
"""
Called when movement phase starts.
"""
pass
def on_movement_end(self, sim: Simulator):
"""
Called when movement phase ends.
"""
pass
@abc.abstractmethod
def begin_movement_prediction(self, sim: Simulator) -> None:
"""
Called when the movement prediction begins.
"""
raise NotImplementedError()
@abc.abstractmethod
def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:
"""
Provides the movement vector for the simulator. The platform is moved by the provided vector.
Returns:
tuple[int, int]: The movement vector in format (dx, dy).
The platform will be moved by dx pixels in the x-direction and dy pixels in the y-direction.
"""
raise NotImplementedError()
@abc.abstractmethod
def _cycle_predict_all(self, sim: Simulator) -> np.ndarray:
"""
Returns a list of bbox predictions of the worm, for each frame of the current cycle.
If a prediction is not available, return None for that frame.
Used internally for logging.
"""
raise NotImplementedError()
Ancestors (in MRO)
- abc.ABC
Descendants
- wtracker.sim.sim_controllers.csv_controller.CsvController
- wtracker.sim.sim_controllers.logging_controller.LoggingController
- wtracker.sim.sim_controllers.yolo_controller.YoloController
Methods
begin_movement_prediction
def begin_movement_prediction(
self,
sim: 'Simulator'
) -> 'None'
Called when the movement prediction begins.
View Source
@abc.abstractmethod
def begin_movement_prediction(self, sim: Simulator) -> None:
"""
Called when the movement prediction begins.
"""
raise NotImplementedError()
on_camera_frame
def on_camera_frame(
self,
sim: 'Simulator'
)
Called when a camera frame is captured. Happens every frame.
View Source
def on_camera_frame(self, sim: Simulator):
"""
Called when a camera frame is captured. Happens every frame.
"""
pass
on_cycle_end
def on_cycle_end(
self,
sim: 'Simulator'
)
Called when a cycle ends.
View Source
def on_cycle_end(self, sim: Simulator):
"""
Called when a cycle ends.
"""
pass
on_cycle_start
def on_cycle_start(
self,
sim: 'Simulator'
)
Called when a new cycle starts.
View Source
def on_cycle_start(self, sim: Simulator):
"""
Called when a new cycle starts.
"""
pass
on_imaging_end
def on_imaging_end(
self,
sim: 'Simulator'
)
Called when imaging phase ends.
View Source
def on_imaging_end(self, sim: Simulator):
"""
Called when imaging phase ends.
"""
pass
on_imaging_start
def on_imaging_start(
self,
sim: 'Simulator'
)
Called when imaging phase starts.
View Source
def on_imaging_start(self, sim: Simulator):
"""
Called when imaging phase starts.
"""
pass
on_micro_frame
def on_micro_frame(
self,
sim: 'Simulator'
)
Called when a micro frame is captured. Happens for every during the imaging phase.
View Source
def on_micro_frame(self, sim: Simulator):
"""
Called when a micro frame is captured. Happens for every during the imaging phase.
"""
pass
on_movement_end
def on_movement_end(
self,
sim: 'Simulator'
)
Called when movement phase ends.
View Source
def on_movement_end(self, sim: Simulator):
"""
Called when movement phase ends.
"""
pass
on_movement_start
def on_movement_start(
self,
sim: 'Simulator'
)
Called when movement phase starts.
View Source
def on_movement_start(self, sim: Simulator):
"""
Called when movement phase starts.
"""
pass
on_sim_end
def on_sim_end(
self,
sim: 'Simulator'
)
Called when the simulation ends.
View Source
def on_sim_end(self, sim: Simulator):
"""
Called when the simulation ends.
"""
pass
on_sim_start
def on_sim_start(
self,
sim: 'Simulator'
)
Called when the simulation starts.
View Source
def on_sim_start(self, sim: Simulator):
"""
Called when the simulation starts.
"""
pass
provide_movement_vector
def provide_movement_vector(
self,
sim: 'Simulator'
) -> 'tuple[int, int]'
Provides the movement vector for the simulator. The platform is moved by the provided vector.
Returns:
Type | Description |
---|---|
tuple[int, int] | The movement vector in format (dx, dy). The platform will be moved by dx pixels in the x-direction and dy pixels in the y-direction. |
View Source
@abc.abstractmethod
def provide_movement_vector(self, sim: Simulator) -> tuple[int, int]:
"""
Provides the movement vector for the simulator. The platform is moved by the provided vector.
Returns:
tuple[int, int]: The movement vector in format (dx, dy).
The platform will be moved by dx pixels in the x-direction and dy pixels in the y-direction.
"""
raise NotImplementedError()
Simulator
class Simulator(
timing_config: 'TimingConfig',
experiment_config: 'ExperimentConfig',
sim_controller: 'SimController',
reader: 'FrameReader' = None,
motor_controller: 'MotorController' = None
)
A class representing a simulator for a biological experiment.
Attributes
Name | Type | Description | Default |
---|---|---|---|
timing_config | TimingConfig | The timing configuration for the experiment. | None |
experiment_config | ExperimentConfig | The experiment configuration. | None |
sim_controller | SimController | The simulation controller. | None |
reader | FrameReader | The frame reader. | None |
motor_controller | MotorController | The motor controller. | None |
timing_config | TimingConfig | The timing configuration for the experiment. | None |
experiment_config | ExperimentConfig | The experiment configuration. | None |
View Source
class Simulator:
"""
A class representing a simulator for a biological experiment.
Args:
timing_config (TimingConfig): The timing configuration for the experiment.
experiment_config (ExperimentConfig): The experiment configuration.
sim_controller (SimController): The simulation controller.
reader (FrameReader, optional): The frame reader.
motor_controller (MotorController, optional): The motor controller.
Attributes:
timing_config (TimingConfig): The timing configuration for the experiment.
experiment_config (ExperimentConfig): The experiment configuration.
"""
def __init__(
self,
timing_config: TimingConfig,
experiment_config: ExperimentConfig,
sim_controller: SimController,
reader: FrameReader = None,
motor_controller: MotorController = None,
) -> None:
self.timing_config = timing_config
self.experiment_config = experiment_config
self._sim_controller = sim_controller
if reader is None:
num_frames = experiment_config.num_frames
padding_size = (timing_config.camera_size_px[0] // 2 * 2, timing_config.camera_size_px[1] // 2 * 2)
resolution = tuple([sum(x) for x in zip(experiment_config.orig_resolution, padding_size)])
reader = DummyReader(num_frames, resolution, colored=True)
if motor_controller is None:
motor_controller = SineMotorController(timing_config)
self._motor_controller = motor_controller
self._view = ViewController(
frame_reader=reader,
camera_size=timing_config.camera_size_px,
micro_size=timing_config.micro_size_px,
init_position=experiment_config.init_position,
)
@property
def view(self) -> ViewController:
"""
Get the view controller.
Returns:
ViewController: The view controller.
"""
return self._view
@property
def position(self) -> tuple[int, int]:
"""
Get the current position.
Returns:
tuple[int, int]: The current position.
"""
return self._view.position
@property
def cycle_number(self) -> int:
"""
Get the current cycle number.
Returns:
int: The current cycle number.
"""
return self._view.index // self.timing_config.cycle_frame_num
@property
def frame_number(self) -> int:
"""
Get the current frame number.
Returns:
int: The current frame number.
"""
return self._view.index
@property
def cycle_step(self) -> int:
"""
Get the current cycle step.
Returns:
int: The current cycle step.
"""
return self._view.index % self.timing_config.cycle_frame_num
def camera_view(self) -> np.ndarray:
"""
Get the view that the camera sees.
Returns:
np.ndarray: The camera view.
"""
return self._view.camera_view()
def micro_view(self) -> np.ndarray:
"""
Get the view that the microscope sees.
Returns:
np.ndarray: The micro view.
"""
return self._view.micro_view()
def _reset(self):
"""
Reset the simulator.
"""
self.view.reset()
self.view.set_position(*self.experiment_config.init_position)
def run(self, visualize: bool = False, wait_key: bool = False):
"""
Run the simulation.
Args:
visualize (bool, optional): Whether to visualize the simulation.
wait_key (bool, optional): Whether to wait for a key press to advance the simulation during visualization.
"""
config = self.timing_config
total_cycles = len(self._view) // config.cycle_frame_num
pbar = tqdm(total=total_cycles, desc="Simulation Progress", unit="cycle")
self._reset()
self._sim_controller.on_sim_start(self)
while self._view.progress():
if self.cycle_step == 0:
if self.cycle_number > 0:
self._sim_controller.on_movement_end(self)
self._sim_controller.on_cycle_end(self)
self._sim_controller.on_cycle_start(self)
self._sim_controller.on_camera_frame(self)
if self.cycle_step == 0:
self._sim_controller.on_imaging_start(self)
if self.cycle_step < config.imaging_frame_num:
self._sim_controller.on_micro_frame(self)
if self.cycle_step == config.imaging_frame_num - config.pred_frame_num:
self._sim_controller.begin_movement_prediction(self)
if self.cycle_step == config.imaging_frame_num:
self._sim_controller.on_imaging_end(self)
dx, dy = self._sim_controller.provide_movement_vector(self)
self._sim_controller.on_movement_start(self)
self._motor_controller.register_move(dx, dy)
if config.imaging_frame_num <= self.cycle_step < config.imaging_frame_num + config.moving_frame_num:
dx, dy = self._motor_controller.step()
self._view.move_position(dx, dy)
if self.cycle_step == config.cycle_frame_num - 1:
pbar.update(1)
if visualize:
self._view.visualize_world(timeout=0 if wait_key else 1)
self._sim_controller.on_sim_end(self)
pbar.close()
Instance variables
cycle_number
Get the current cycle number.
cycle_step
Get the current cycle step.
frame_number
Get the current frame number.
position
Get the current position.
view
Get the view controller.
Methods
camera_view
def camera_view(
self
) -> 'np.ndarray'
Get the view that the camera sees.
Returns:
Type | Description |
---|---|
np.ndarray | The camera view. |
View Source
def camera_view(self) -> np.ndarray:
"""
Get the view that the camera sees.
Returns:
np.ndarray: The camera view.
"""
return self._view.camera_view()
micro_view
def micro_view(
self
) -> 'np.ndarray'
Get the view that the microscope sees.
Returns:
Type | Description |
---|---|
np.ndarray | The micro view. |
View Source
def micro_view(self) -> np.ndarray:
"""
Get the view that the microscope sees.
Returns:
np.ndarray: The micro view.
"""
return self._view.micro_view()
run
def run(
self,
visualize: 'bool' = False,
wait_key: 'bool' = False
)
Run the simulation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
visualize | bool | Whether to visualize the simulation. | None |
wait_key | bool | Whether to wait for a key press to advance the simulation during visualization. | None |
View Source
def run(self, visualize: bool = False, wait_key: bool = False):
"""
Run the simulation.
Args:
visualize (bool, optional): Whether to visualize the simulation.
wait_key (bool, optional): Whether to wait for a key press to advance the simulation during visualization.
"""
config = self.timing_config
total_cycles = len(self._view) // config.cycle_frame_num
pbar = tqdm(total=total_cycles, desc="Simulation Progress", unit="cycle")
self._reset()
self._sim_controller.on_sim_start(self)
while self._view.progress():
if self.cycle_step == 0:
if self.cycle_number > 0:
self._sim_controller.on_movement_end(self)
self._sim_controller.on_cycle_end(self)
self._sim_controller.on_cycle_start(self)
self._sim_controller.on_camera_frame(self)
if self.cycle_step == 0:
self._sim_controller.on_imaging_start(self)
if self.cycle_step < config.imaging_frame_num:
self._sim_controller.on_micro_frame(self)
if self.cycle_step == config.imaging_frame_num - config.pred_frame_num:
self._sim_controller.begin_movement_prediction(self)
if self.cycle_step == config.imaging_frame_num:
self._sim_controller.on_imaging_end(self)
dx, dy = self._sim_controller.provide_movement_vector(self)
self._sim_controller.on_movement_start(self)
self._motor_controller.register_move(dx, dy)
if config.imaging_frame_num <= self.cycle_step < config.imaging_frame_num + config.moving_frame_num:
dx, dy = self._motor_controller.step()
self._view.move_position(dx, dy)
if self.cycle_step == config.cycle_frame_num - 1:
pbar.update(1)
if visualize:
self._view.visualize_world(timeout=0 if wait_key else 1)
self._sim_controller.on_sim_end(self)
pbar.close()