Module wtracker.eval.data_analyzer
View Source
from __future__ import annotations
import pandas as pd
import numpy as np
import tqdm.contrib.concurrent as concurrent
from wtracker.sim.config import TimingConfig
from wtracker.eval.error_calculator import ErrorCalculator
from wtracker.utils.frame_reader import FrameReader
from wtracker.utils.threading_utils import adjust_num_workers
class DataAnalyzer:
"""
A class for analyzing simulation log.
Args:
time_config (TimingConfig): The timing configuration.
log_path (pd.DataFrame): Dataframe containing the simulation log data.
"""
def __init__(
self,
time_config: TimingConfig,
log_data: pd.DataFrame,
):
self.time_config = time_config
self.data = log_data.copy()
self._orig_data = log_data
self._unit = "frame"
@property
def unit(self) -> str:
return self._unit
def save(self, path: str) -> None:
"""
Save the full analyzed data to a csv file.
"""
self._orig_data.to_csv(path, index=False)
@staticmethod
def load(time_config: TimingConfig, csv_path: str) -> DataAnalyzer:
"""
Create a DataAnalyzer object from a csv file containing experiment data,
regardless whether if it's analyzed or not.
Args:
time_config (TimingConfig): The timing configuration.
csv_path (str): Path to the csv file containing the experiment data.
"""
data = pd.read_csv(csv_path)
return DataAnalyzer(time_config, data)
def initialize(self, period: int = 10):
"""
Initializes the data analyzer.
It's essential to call this function if the class was created from a non-analyzed log data.
Args:
period (int): The period for calculating speed in frames.
The speed is calculated by measuring the distance between current frame and period frames before.
"""
data = self._orig_data
data["time"] = data["frame"]
data["cycle_step"] = data["frame"] % self.time_config.cycle_frame_num
data = DataAnalyzer._calc_centers(data)
data = DataAnalyzer._calc_speed(data, period)
data = DataAnalyzer._calc_worm_deviation(data)
data = DataAnalyzer._calc_errors(data)
data = data.round(5)
self._orig_data = data
self.data = self._orig_data.copy()
@staticmethod
def _calc_centers(data: pd.DataFrame) -> pd.DataFrame:
data["wrm_center_x"] = data["wrm_x"] + data["wrm_w"] / 2
data["wrm_center_y"] = data["wrm_y"] + data["wrm_h"] / 2
data["mic_center_x"] = data["mic_x"] + data["mic_w"] / 2
data["mic_center_y"] = data["mic_y"] + data["mic_h"] / 2
return data
@staticmethod
def _calc_speed(data: pd.DataFrame, n: int) -> pd.DataFrame:
diff = data["time"].diff(n).to_numpy()
data["wrm_speed_x"] = data["wrm_center_x"].diff(n) / diff
data["wrm_speed_y"] = data["wrm_center_y"].diff(n) / diff
data["wrm_speed"] = np.sqrt(data["wrm_speed_x"] ** 2 + data["wrm_speed_y"] ** 2)
return data
@staticmethod
def _calc_worm_deviation(data: pd.DataFrame) -> pd.DataFrame:
data["worm_deviation_x"] = data["wrm_center_x"] - data["mic_center_x"]
data["worm_deviation_y"] = data["wrm_center_y"] - data["mic_center_y"]
data["worm_deviation"] = np.sqrt(data["worm_deviation_x"] ** 2 + data["worm_deviation_y"] ** 2)
return data
@staticmethod
def _calc_errors(data: pd.DataFrame) -> pd.DataFrame:
wrm_bboxes = data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
bbox_error = ErrorCalculator.calculate_bbox_error(wrm_bboxes, mic_bboxes)
data["bbox_error"] = bbox_error
data["precise_error"] = np.nan
return data
def remove_cycle(self, cycles: int | list[int]):
"""
Remove the specified cycles from the data.
Args:
cycles (int | list[int]): The cycle(s) to remove from the data.
"""
if isinstance(cycles, int):
cycles = [cycles]
mask = self.data["cycle"].isin(cycles)
self.data = self.data[~mask]
def clean(
self,
trim_cycles: bool = False,
imaging_only: bool = False,
bounds: tuple[float, float, float, float] = None,
) -> None:
"""
Clean the data by the provided parameters.
Args:
trim_cycles (bool): whether to remove the first and the last cycles from the data.
imaging_only (bool): Flag indicating whether to include only imaging phases in the analysis.
legal_bounds (tuple[float, float, float, float]): The legal bounds for worm movement.
"""
data = self.data
if imaging_only:
mask = data["phase"] == "imaging"
data = data[mask]
if bounds is not None:
has_pred = np.isfinite(data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()).all(axis=1)
mask_wrm = has_pred # if there is a prediction for a frame then look at worm bbox
mask_wrm &= (data["wrm_x"] >= bounds[0]) & (data["wrm_x"] + data["wrm_w"] <= bounds[2])
mask_wrm &= (data["wrm_y"] >= bounds[1]) & (data["wrm_y"] + data["wrm_h"] <= bounds[3])
mask_mic = ~has_pred # if there is no prediction for a frame then look at micro bbox
mask_mic &= (data["mic_x"] >= bounds[0]) & (data["mic_x"] + data["mic_w"] <= bounds[2])
mask_mic &= (data["mic_y"] >= bounds[1]) & (data["mic_y"] + data["mic_h"] <= bounds[3])
data = data[mask_wrm | mask_mic]
if trim_cycles:
mask = data["cycle"] != 0
mask &= data["cycle"] != data["cycle"].max()
data = data[mask]
self.data = data
def reset_changes(self):
"""
Reset the data to its original state.
Note, that this method will not reset the unit of time and distance.
"""
self.data = self._orig_data.copy()
self._unit = "frame"
def column_names(self) -> list[str]:
"""
Returns a list of all column names in the analyzed data.
Returns:
list[str]: A list of column names.
"""
return self.data.columns.to_list()
def change_unit(self, unit: str):
"""
Changes the unit of time and distance in the data.
Args:
unit (str, optional): The new unit of time to convert into. Can be "frame" or "sec".
If "sec" is chosen, the time will be converted to seconds, and the distance metric is micrometer.
If "frame" is chosen, the time will be in frames, and the distance metric is pixels.
"""
assert unit in ["frame", "sec"]
if self._unit == unit:
return
data = self.data
if unit == "sec": # frame -> sec
dist_factor = self.time_config.mm_per_px * 1000
time_factor = self.time_config.ms_per_frame / 1000
if unit == "frame": # sec -> frame
dist_factor = self.time_config.px_per_mm / 1000
time_factor = self.time_config.frames_per_sec
data["time"] *= time_factor
data[["plt_x", "plt_y"]] *= dist_factor
data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]] *= dist_factor
data[["mic_x", "mic_y", "mic_w", "mic_h"]] *= dist_factor
data[["cam_x", "cam_y", "cam_w", "cam_h"]] *= dist_factor
data[["wrm_center_x", "wrm_center_y"]] *= dist_factor
data[["mic_center_x", "mic_center_y"]] *= dist_factor
data[["worm_deviation_x", "worm_deviation_y", "worm_deviation"]] *= dist_factor
data[["wrm_speed_x", "wrm_speed_y", "wrm_speed"]] *= dist_factor / time_factor
self._unit = unit
self.data = data
# TODO: TEST
# TODO: MAYBE REMOVE, THE non-multithreaded version works very fast for me for some reason
# perhaps SSD is required for fast analysis.
def calc_precise_error_experimental(
self,
worm_reader: FrameReader,
background: np.ndarray,
diff_thresh=20,
num_workers: int = None,
chunk_size: int = 2000,
) -> None:
"""
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is
outside of the view of the microscope. Note that this calculation might take a while.
Args:
worm_reader (FrameReader): Images of the worm at each frame, cropped to the size of the bounding box
which was detected around the worm.
background (np.ndarray): The background image of the entire experiment.
diff_thresh (int): Difference threshold to differentiate between the background and foreground.
A foreground object is detected if the pixel value difference with the background is greater than this threshold.
num_workers (int, optional): The number of workers to use for parallel processing.
If None, the number of workers is determined automatically.
chunk_size (int, optional): The size of each processing chunk.
"""
frames = self._orig_data["frame"].to_numpy().astype(int, copy=False)
wrm_bboxes = self._orig_data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = self._orig_data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
errors = np.ones_like(frames, dtype=float)
mask = np.isfinite(wrm_bboxes).all(axis=1)
wrm_bboxes = wrm_bboxes[mask]
mic_bboxes = mic_bboxes[mask]
frames = frames[mask]
num_sections = len(frames) // chunk_size
wrm_bboxes_list = np.array_split(wrm_bboxes, num_sections, axis=0)
mic_bboxes_list = np.array_split(mic_bboxes, num_sections, axis=0)
frames_list = np.array_split(frames, num_sections)
# TODO: add non-multithreaded case whenever num_workers=0
num_workers = adjust_num_workers(len(frames), chunk_size, num_workers)
def calc_error(idx: int) -> np.ndarray:
return ErrorCalculator.calculate_precise(
background=background,
worm_bboxes=wrm_bboxes_list[idx],
mic_bboxes=mic_bboxes_list[idx],
frame_nums=frames_list[idx],
worm_reader=worm_reader,
diff_thresh=diff_thresh,
)
results = concurrent.thread_map(
calc_error,
list(range(len(wrm_bboxes_list))),
max_workers=num_workers,
chunksize=1,
desc="Extracting bboxes",
unit="fr",
leave=False,
)
# set the error in the original data
errors[mask] = np.concatenate(results)
self._orig_data["precise_error"] = errors
# copy relevant error entries into the work data
idx = self.data["frame"].to_numpy(dtype=int, copy=False)
self.data["precise_error"] = errors[idx]
def calc_precise_error(
self,
worm_reader: FrameReader,
background: np.ndarray,
diff_thresh=20,
) -> None:
"""
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is
outside of the view of the microscope. Note that this calculation might take a while.
Args:
worm_reader (FrameReader): Images of the worm at each frame, cropped to the size of the bounding box
which was detected around the worm.
background (np.ndarray): The background image of the entire experiment.
diff_thresh (int): Difference threshold to differentiate between the background and foreground.
A foreground object is detected if the pixel value difference with the background is greater than this threshold.
"""
frames = self._orig_data["frame"].to_numpy().astype(np.int32, copy=False)
wrm_bboxes = self._orig_data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = self._orig_data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
errors = ErrorCalculator.calculate_precise(
background=background,
worm_bboxes=wrm_bboxes,
mic_bboxes=mic_bboxes,
frame_nums=frames,
worm_reader=worm_reader,
diff_thresh=diff_thresh,
)
self._orig_data["precise_error"] = errors
# copy relevant error entries into the work data
idx = self.data["frame"].to_numpy(dtype=int, copy=False)
self.data["precise_error"] = errors[idx]
def calc_anomalies(
self,
no_preds: bool = True,
min_bbox_error: float = np.inf,
min_dist_error: float = np.inf,
min_speed: float = np.inf,
min_size: float = np.inf,
remove_anomalies: bool = False,
) -> pd.DataFrame:
"""
Calculate anomalies in the data based on specified criteria.
Args:
no_preds (bool, optional): Flag indicating whether to consider instances with missing predictions.
min_bbox_error (float, optional): Minimum bounding box error threshold to consider as anomaly.
min_dist_error (float, optional): Minimum distance error threshold to consider as anomaly.
min_speed (float, optional): Minimum speed threshold to consider as anomaly.
min_size (float, optional): Minimum size threshold to consider as anomaly.
remove_anomalies (bool, optional): Flag indicating whether to remove the anomalies from the data.
Returns:
pd.DataFrame: DataFrame containing the anomalies found in the data.
"""
data = self.data
mask_speed = data["wrm_speed"] >= min_speed
mask_bbox_error = data["bbox_error"] >= min_bbox_error
mask_dist_error = data["worm_deviation"] >= min_dist_error
mask_worm_width = data["wrm_w"] >= min_size
mask_worm_height = data["wrm_h"] >= min_size
mask_no_preds = np.isfinite(data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()).all(axis=1) == False
mask_no_preds = no_preds & mask_no_preds
mask = mask_speed | mask_bbox_error | mask_dist_error | mask_worm_width | mask_worm_height | mask_no_preds
anomalies = data[mask].copy()
anomalies["speed_anomaly"] = mask_speed[mask]
anomalies["bbox_error_anomaly"] = mask_bbox_error[mask]
anomalies["dist_error_anomaly"] = mask_dist_error[mask]
anomalies["width_anomaly"] = mask_worm_width[mask]
anomalies["height_anomaly"] = mask_worm_height[mask]
anomalies["no_pred_anomaly"] = mask_no_preds[mask]
if remove_anomalies:
self.data = self.data[~mask]
return anomalies
def describe(self, columns: list[str] = None, num: int = 3, percentiles: list[float] = None) -> pd.DataFrame:
"""
Generate descriptive statistics of the specified columns in the table containing the data.
Args:
columns (list[str], optional): List of column names to include in the analysis. If None, all columns will be included.
num (int, optional): Number of evenly spaced percentiles to include in the analysis. If percentiles is not None, this parameter is ignored.
percentiles (list[float], optional): List of specific percentiles to include in the analysis. If None, evenly spaced percentiles will be used.
Returns:
pd.DataFrame: A DataFrame containing the descriptive statistics of the specified columns.
"""
if columns is None:
columns = self.column_names()
if percentiles is None:
percentiles = np.linspace(start=0, stop=1.0, num=num + 2)[1:-1]
return self.data[columns].describe(percentiles)
def print_stats(self) -> None:
"""
Prints various statistics related to the data.
This method calculates and prints the following statistics:
- Count of Removed Frames: The number of frames that were removed from the original data.
- Total Count of No Pred Frames: The number of frames where the predictions are missing.
- Total Num of Cycles: The number of unique cycles in the data.
- Non Perfect Predictions: The percentage of predictions that are not perfect.
"""
num_removed = len(self._orig_data.index) - len(self.data.index)
print(f"Count of Removed Frames: {num_removed} ({round(100 * num_removed / len(self._orig_data.index), 3)}%)")
no_preds = self.data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].isna().any(axis=1).sum()
print(f"Count of No-Pred Frames: {no_preds} ({round(100 * no_preds / len(self.data.index), 3)}%)")
num_cycles = self.data["cycle"].nunique()
print(f"Total Num of Cycles: {num_cycles}")
non_perfect = (self.data["bbox_error"] > 1e-7).sum() / len(self.data.index)
print(f"Non Perfect Predictions: {round(100 * non_perfect, 3)}%")
Classes
DataAnalyzer
class DataAnalyzer(
time_config: 'TimingConfig',
log_data: 'pd.DataFrame'
)
A class for analyzing simulation log.
Attributes
Name | Type | Description | Default |
---|---|---|---|
time_config | TimingConfig | The timing configuration. | None |
log_path | pd.DataFrame | Dataframe containing the simulation log data. | None |
View Source
class DataAnalyzer:
"""
A class for analyzing simulation log.
Args:
time_config (TimingConfig): The timing configuration.
log_path (pd.DataFrame): Dataframe containing the simulation log data.
"""
def __init__(
self,
time_config: TimingConfig,
log_data: pd.DataFrame,
):
self.time_config = time_config
self.data = log_data.copy()
self._orig_data = log_data
self._unit = "frame"
@property
def unit(self) -> str:
return self._unit
def save(self, path: str) -> None:
"""
Save the full analyzed data to a csv file.
"""
self._orig_data.to_csv(path, index=False)
@staticmethod
def load(time_config: TimingConfig, csv_path: str) -> DataAnalyzer:
"""
Create a DataAnalyzer object from a csv file containing experiment data,
regardless whether if it's analyzed or not.
Args:
time_config (TimingConfig): The timing configuration.
csv_path (str): Path to the csv file containing the experiment data.
"""
data = pd.read_csv(csv_path)
return DataAnalyzer(time_config, data)
def initialize(self, period: int = 10):
"""
Initializes the data analyzer.
It's essential to call this function if the class was created from a non-analyzed log data.
Args:
period (int): The period for calculating speed in frames.
The speed is calculated by measuring the distance between current frame and period frames before.
"""
data = self._orig_data
data["time"] = data["frame"]
data["cycle_step"] = data["frame"] % self.time_config.cycle_frame_num
data = DataAnalyzer._calc_centers(data)
data = DataAnalyzer._calc_speed(data, period)
data = DataAnalyzer._calc_worm_deviation(data)
data = DataAnalyzer._calc_errors(data)
data = data.round(5)
self._orig_data = data
self.data = self._orig_data.copy()
@staticmethod
def _calc_centers(data: pd.DataFrame) -> pd.DataFrame:
data["wrm_center_x"] = data["wrm_x"] + data["wrm_w"] / 2
data["wrm_center_y"] = data["wrm_y"] + data["wrm_h"] / 2
data["mic_center_x"] = data["mic_x"] + data["mic_w"] / 2
data["mic_center_y"] = data["mic_y"] + data["mic_h"] / 2
return data
@staticmethod
def _calc_speed(data: pd.DataFrame, n: int) -> pd.DataFrame:
diff = data["time"].diff(n).to_numpy()
data["wrm_speed_x"] = data["wrm_center_x"].diff(n) / diff
data["wrm_speed_y"] = data["wrm_center_y"].diff(n) / diff
data["wrm_speed"] = np.sqrt(data["wrm_speed_x"] ** 2 + data["wrm_speed_y"] ** 2)
return data
@staticmethod
def _calc_worm_deviation(data: pd.DataFrame) -> pd.DataFrame:
data["worm_deviation_x"] = data["wrm_center_x"] - data["mic_center_x"]
data["worm_deviation_y"] = data["wrm_center_y"] - data["mic_center_y"]
data["worm_deviation"] = np.sqrt(data["worm_deviation_x"] ** 2 + data["worm_deviation_y"] ** 2)
return data
@staticmethod
def _calc_errors(data: pd.DataFrame) -> pd.DataFrame:
wrm_bboxes = data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
bbox_error = ErrorCalculator.calculate_bbox_error(wrm_bboxes, mic_bboxes)
data["bbox_error"] = bbox_error
data["precise_error"] = np.nan
return data
def remove_cycle(self, cycles: int | list[int]):
"""
Remove the specified cycles from the data.
Args:
cycles (int | list[int]): The cycle(s) to remove from the data.
"""
if isinstance(cycles, int):
cycles = [cycles]
mask = self.data["cycle"].isin(cycles)
self.data = self.data[~mask]
def clean(
self,
trim_cycles: bool = False,
imaging_only: bool = False,
bounds: tuple[float, float, float, float] = None,
) -> None:
"""
Clean the data by the provided parameters.
Args:
trim_cycles (bool): whether to remove the first and the last cycles from the data.
imaging_only (bool): Flag indicating whether to include only imaging phases in the analysis.
legal_bounds (tuple[float, float, float, float]): The legal bounds for worm movement.
"""
data = self.data
if imaging_only:
mask = data["phase"] == "imaging"
data = data[mask]
if bounds is not None:
has_pred = np.isfinite(data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()).all(axis=1)
mask_wrm = has_pred # if there is a prediction for a frame then look at worm bbox
mask_wrm &= (data["wrm_x"] >= bounds[0]) & (data["wrm_x"] + data["wrm_w"] <= bounds[2])
mask_wrm &= (data["wrm_y"] >= bounds[1]) & (data["wrm_y"] + data["wrm_h"] <= bounds[3])
mask_mic = ~has_pred # if there is no prediction for a frame then look at micro bbox
mask_mic &= (data["mic_x"] >= bounds[0]) & (data["mic_x"] + data["mic_w"] <= bounds[2])
mask_mic &= (data["mic_y"] >= bounds[1]) & (data["mic_y"] + data["mic_h"] <= bounds[3])
data = data[mask_wrm | mask_mic]
if trim_cycles:
mask = data["cycle"] != 0
mask &= data["cycle"] != data["cycle"].max()
data = data[mask]
self.data = data
def reset_changes(self):
"""
Reset the data to its original state.
Note, that this method will not reset the unit of time and distance.
"""
self.data = self._orig_data.copy()
self._unit = "frame"
def column_names(self) -> list[str]:
"""
Returns a list of all column names in the analyzed data.
Returns:
list[str]: A list of column names.
"""
return self.data.columns.to_list()
def change_unit(self, unit: str):
"""
Changes the unit of time and distance in the data.
Args:
unit (str, optional): The new unit of time to convert into. Can be "frame" or "sec".
If "sec" is chosen, the time will be converted to seconds, and the distance metric is micrometer.
If "frame" is chosen, the time will be in frames, and the distance metric is pixels.
"""
assert unit in ["frame", "sec"]
if self._unit == unit:
return
data = self.data
if unit == "sec": # frame -> sec
dist_factor = self.time_config.mm_per_px * 1000
time_factor = self.time_config.ms_per_frame / 1000
if unit == "frame": # sec -> frame
dist_factor = self.time_config.px_per_mm / 1000
time_factor = self.time_config.frames_per_sec
data["time"] *= time_factor
data[["plt_x", "plt_y"]] *= dist_factor
data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]] *= dist_factor
data[["mic_x", "mic_y", "mic_w", "mic_h"]] *= dist_factor
data[["cam_x", "cam_y", "cam_w", "cam_h"]] *= dist_factor
data[["wrm_center_x", "wrm_center_y"]] *= dist_factor
data[["mic_center_x", "mic_center_y"]] *= dist_factor
data[["worm_deviation_x", "worm_deviation_y", "worm_deviation"]] *= dist_factor
data[["wrm_speed_x", "wrm_speed_y", "wrm_speed"]] *= dist_factor / time_factor
self._unit = unit
self.data = data
# TODO: TEST
# TODO: MAYBE REMOVE, THE non-multithreaded version works very fast for me for some reason
# perhaps SSD is required for fast analysis.
def calc_precise_error_experimental(
self,
worm_reader: FrameReader,
background: np.ndarray,
diff_thresh=20,
num_workers: int = None,
chunk_size: int = 2000,
) -> None:
"""
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is
outside of the view of the microscope. Note that this calculation might take a while.
Args:
worm_reader (FrameReader): Images of the worm at each frame, cropped to the size of the bounding box
which was detected around the worm.
background (np.ndarray): The background image of the entire experiment.
diff_thresh (int): Difference threshold to differentiate between the background and foreground.
A foreground object is detected if the pixel value difference with the background is greater than this threshold.
num_workers (int, optional): The number of workers to use for parallel processing.
If None, the number of workers is determined automatically.
chunk_size (int, optional): The size of each processing chunk.
"""
frames = self._orig_data["frame"].to_numpy().astype(int, copy=False)
wrm_bboxes = self._orig_data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = self._orig_data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
errors = np.ones_like(frames, dtype=float)
mask = np.isfinite(wrm_bboxes).all(axis=1)
wrm_bboxes = wrm_bboxes[mask]
mic_bboxes = mic_bboxes[mask]
frames = frames[mask]
num_sections = len(frames) // chunk_size
wrm_bboxes_list = np.array_split(wrm_bboxes, num_sections, axis=0)
mic_bboxes_list = np.array_split(mic_bboxes, num_sections, axis=0)
frames_list = np.array_split(frames, num_sections)
# TODO: add non-multithreaded case whenever num_workers=0
num_workers = adjust_num_workers(len(frames), chunk_size, num_workers)
def calc_error(idx: int) -> np.ndarray:
return ErrorCalculator.calculate_precise(
background=background,
worm_bboxes=wrm_bboxes_list[idx],
mic_bboxes=mic_bboxes_list[idx],
frame_nums=frames_list[idx],
worm_reader=worm_reader,
diff_thresh=diff_thresh,
)
results = concurrent.thread_map(
calc_error,
list(range(len(wrm_bboxes_list))),
max_workers=num_workers,
chunksize=1,
desc="Extracting bboxes",
unit="fr",
leave=False,
)
# set the error in the original data
errors[mask] = np.concatenate(results)
self._orig_data["precise_error"] = errors
# copy relevant error entries into the work data
idx = self.data["frame"].to_numpy(dtype=int, copy=False)
self.data["precise_error"] = errors[idx]
def calc_precise_error(
self,
worm_reader: FrameReader,
background: np.ndarray,
diff_thresh=20,
) -> None:
"""
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is
outside of the view of the microscope. Note that this calculation might take a while.
Args:
worm_reader (FrameReader): Images of the worm at each frame, cropped to the size of the bounding box
which was detected around the worm.
background (np.ndarray): The background image of the entire experiment.
diff_thresh (int): Difference threshold to differentiate between the background and foreground.
A foreground object is detected if the pixel value difference with the background is greater than this threshold.
"""
frames = self._orig_data["frame"].to_numpy().astype(np.int32, copy=False)
wrm_bboxes = self._orig_data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = self._orig_data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
errors = ErrorCalculator.calculate_precise(
background=background,
worm_bboxes=wrm_bboxes,
mic_bboxes=mic_bboxes,
frame_nums=frames,
worm_reader=worm_reader,
diff_thresh=diff_thresh,
)
self._orig_data["precise_error"] = errors
# copy relevant error entries into the work data
idx = self.data["frame"].to_numpy(dtype=int, copy=False)
self.data["precise_error"] = errors[idx]
def calc_anomalies(
self,
no_preds: bool = True,
min_bbox_error: float = np.inf,
min_dist_error: float = np.inf,
min_speed: float = np.inf,
min_size: float = np.inf,
remove_anomalies: bool = False,
) -> pd.DataFrame:
"""
Calculate anomalies in the data based on specified criteria.
Args:
no_preds (bool, optional): Flag indicating whether to consider instances with missing predictions.
min_bbox_error (float, optional): Minimum bounding box error threshold to consider as anomaly.
min_dist_error (float, optional): Minimum distance error threshold to consider as anomaly.
min_speed (float, optional): Minimum speed threshold to consider as anomaly.
min_size (float, optional): Minimum size threshold to consider as anomaly.
remove_anomalies (bool, optional): Flag indicating whether to remove the anomalies from the data.
Returns:
pd.DataFrame: DataFrame containing the anomalies found in the data.
"""
data = self.data
mask_speed = data["wrm_speed"] >= min_speed
mask_bbox_error = data["bbox_error"] >= min_bbox_error
mask_dist_error = data["worm_deviation"] >= min_dist_error
mask_worm_width = data["wrm_w"] >= min_size
mask_worm_height = data["wrm_h"] >= min_size
mask_no_preds = np.isfinite(data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()).all(axis=1) == False
mask_no_preds = no_preds & mask_no_preds
mask = mask_speed | mask_bbox_error | mask_dist_error | mask_worm_width | mask_worm_height | mask_no_preds
anomalies = data[mask].copy()
anomalies["speed_anomaly"] = mask_speed[mask]
anomalies["bbox_error_anomaly"] = mask_bbox_error[mask]
anomalies["dist_error_anomaly"] = mask_dist_error[mask]
anomalies["width_anomaly"] = mask_worm_width[mask]
anomalies["height_anomaly"] = mask_worm_height[mask]
anomalies["no_pred_anomaly"] = mask_no_preds[mask]
if remove_anomalies:
self.data = self.data[~mask]
return anomalies
def describe(self, columns: list[str] = None, num: int = 3, percentiles: list[float] = None) -> pd.DataFrame:
"""
Generate descriptive statistics of the specified columns in the table containing the data.
Args:
columns (list[str], optional): List of column names to include in the analysis. If None, all columns will be included.
num (int, optional): Number of evenly spaced percentiles to include in the analysis. If percentiles is not None, this parameter is ignored.
percentiles (list[float], optional): List of specific percentiles to include in the analysis. If None, evenly spaced percentiles will be used.
Returns:
pd.DataFrame: A DataFrame containing the descriptive statistics of the specified columns.
"""
if columns is None:
columns = self.column_names()
if percentiles is None:
percentiles = np.linspace(start=0, stop=1.0, num=num + 2)[1:-1]
return self.data[columns].describe(percentiles)
def print_stats(self) -> None:
"""
Prints various statistics related to the data.
This method calculates and prints the following statistics:
- Count of Removed Frames: The number of frames that were removed from the original data.
- Total Count of No Pred Frames: The number of frames where the predictions are missing.
- Total Num of Cycles: The number of unique cycles in the data.
- Non Perfect Predictions: The percentage of predictions that are not perfect.
"""
num_removed = len(self._orig_data.index) - len(self.data.index)
print(f"Count of Removed Frames: {num_removed} ({round(100 * num_removed / len(self._orig_data.index), 3)}%)")
no_preds = self.data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].isna().any(axis=1).sum()
print(f"Count of No-Pred Frames: {no_preds} ({round(100 * no_preds / len(self.data.index), 3)}%)")
num_cycles = self.data["cycle"].nunique()
print(f"Total Num of Cycles: {num_cycles}")
non_perfect = (self.data["bbox_error"] > 1e-7).sum() / len(self.data.index)
print(f"Non Perfect Predictions: {round(100 * non_perfect, 3)}%")
Static methods
load
def load(
time_config: 'TimingConfig',
csv_path: 'str'
) -> 'DataAnalyzer'
Create a DataAnalyzer object from a csv file containing experiment data,
regardless whether if it's analyzed or not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_config | TimingConfig | The timing configuration. | None |
csv_path | str | Path to the csv file containing the experiment data. | None |
View Source
@staticmethod
def load(time_config: TimingConfig, csv_path: str) -> DataAnalyzer:
"""
Create a DataAnalyzer object from a csv file containing experiment data,
regardless whether if it's analyzed or not.
Args:
time_config (TimingConfig): The timing configuration.
csv_path (str): Path to the csv file containing the experiment data.
"""
data = pd.read_csv(csv_path)
return DataAnalyzer(time_config, data)
Instance variables
unit
Methods
calc_anomalies
def calc_anomalies(
self,
no_preds: 'bool' = True,
min_bbox_error: 'float' = inf,
min_dist_error: 'float' = inf,
min_speed: 'float' = inf,
min_size: 'float' = inf,
remove_anomalies: 'bool' = False
) -> 'pd.DataFrame'
Calculate anomalies in the data based on specified criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
no_preds | bool | Flag indicating whether to consider instances with missing predictions. | None |
min_bbox_error | float | Minimum bounding box error threshold to consider as anomaly. | None |
min_dist_error | float | Minimum distance error threshold to consider as anomaly. | None |
min_speed | float | Minimum speed threshold to consider as anomaly. | None |
min_size | float | Minimum size threshold to consider as anomaly. | None |
remove_anomalies | bool | Flag indicating whether to remove the anomalies from the data. | None |
Returns:
Type | Description |
---|---|
pd.DataFrame | DataFrame containing the anomalies found in the data. |
View Source
def calc_anomalies(
self,
no_preds: bool = True,
min_bbox_error: float = np.inf,
min_dist_error: float = np.inf,
min_speed: float = np.inf,
min_size: float = np.inf,
remove_anomalies: bool = False,
) -> pd.DataFrame:
"""
Calculate anomalies in the data based on specified criteria.
Args:
no_preds (bool, optional): Flag indicating whether to consider instances with missing predictions.
min_bbox_error (float, optional): Minimum bounding box error threshold to consider as anomaly.
min_dist_error (float, optional): Minimum distance error threshold to consider as anomaly.
min_speed (float, optional): Minimum speed threshold to consider as anomaly.
min_size (float, optional): Minimum size threshold to consider as anomaly.
remove_anomalies (bool, optional): Flag indicating whether to remove the anomalies from the data.
Returns:
pd.DataFrame: DataFrame containing the anomalies found in the data.
"""
data = self.data
mask_speed = data["wrm_speed"] >= min_speed
mask_bbox_error = data["bbox_error"] >= min_bbox_error
mask_dist_error = data["worm_deviation"] >= min_dist_error
mask_worm_width = data["wrm_w"] >= min_size
mask_worm_height = data["wrm_h"] >= min_size
mask_no_preds = np.isfinite(data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()).all(axis=1) == False
mask_no_preds = no_preds & mask_no_preds
mask = mask_speed | mask_bbox_error | mask_dist_error | mask_worm_width | mask_worm_height | mask_no_preds
anomalies = data[mask].copy()
anomalies["speed_anomaly"] = mask_speed[mask]
anomalies["bbox_error_anomaly"] = mask_bbox_error[mask]
anomalies["dist_error_anomaly"] = mask_dist_error[mask]
anomalies["width_anomaly"] = mask_worm_width[mask]
anomalies["height_anomaly"] = mask_worm_height[mask]
anomalies["no_pred_anomaly"] = mask_no_preds[mask]
if remove_anomalies:
self.data = self.data[~mask]
return anomalies
calc_precise_error
def calc_precise_error(
self,
worm_reader: 'FrameReader',
background: 'np.ndarray',
diff_thresh=20
) -> 'None'
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is outside of the view of the microscope. Note that this calculation might take a while.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worm_reader | FrameReader | Images of the worm at each frame, cropped to the size of the bounding box which was detected around the worm. |
None |
background | np.ndarray | The background image of the entire experiment. | None |
diff_thresh | int | Difference threshold to differentiate between the background and foreground. A foreground object is detected if the pixel value difference with the background is greater than this threshold. |
None |
View Source
def calc_precise_error(
self,
worm_reader: FrameReader,
background: np.ndarray,
diff_thresh=20,
) -> None:
"""
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is
outside of the view of the microscope. Note that this calculation might take a while.
Args:
worm_reader (FrameReader): Images of the worm at each frame, cropped to the size of the bounding box
which was detected around the worm.
background (np.ndarray): The background image of the entire experiment.
diff_thresh (int): Difference threshold to differentiate between the background and foreground.
A foreground object is detected if the pixel value difference with the background is greater than this threshold.
"""
frames = self._orig_data["frame"].to_numpy().astype(np.int32, copy=False)
wrm_bboxes = self._orig_data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = self._orig_data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
errors = ErrorCalculator.calculate_precise(
background=background,
worm_bboxes=wrm_bboxes,
mic_bboxes=mic_bboxes,
frame_nums=frames,
worm_reader=worm_reader,
diff_thresh=diff_thresh,
)
self._orig_data["precise_error"] = errors
# copy relevant error entries into the work data
idx = self.data["frame"].to_numpy(dtype=int, copy=False)
self.data["precise_error"] = errors[idx]
calc_precise_error_experimental
def calc_precise_error_experimental(
self,
worm_reader: 'FrameReader',
background: 'np.ndarray',
diff_thresh=20,
num_workers: 'int' = None,
chunk_size: 'int' = 2000
) -> 'None'
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is outside of the view of the microscope. Note that this calculation might take a while.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worm_reader | FrameReader | Images of the worm at each frame, cropped to the size of the bounding box which was detected around the worm. |
None |
background | np.ndarray | The background image of the entire experiment. | None |
diff_thresh | int | Difference threshold to differentiate between the background and foreground. A foreground object is detected if the pixel value difference with the background is greater than this threshold. |
None |
num_workers | int | The number of workers to use for parallel processing. If None, the number of workers is determined automatically. |
None |
chunk_size | int | The size of each processing chunk. | None |
View Source
def calc_precise_error_experimental(
self,
worm_reader: FrameReader,
background: np.ndarray,
diff_thresh=20,
num_workers: int = None,
chunk_size: int = 2000,
) -> None:
"""
Calculate the precise error between the worm and the microscope view.
This error is segmentation based, and measures the proportion of worm's head that is
outside of the view of the microscope. Note that this calculation might take a while.
Args:
worm_reader (FrameReader): Images of the worm at each frame, cropped to the size of the bounding box
which was detected around the worm.
background (np.ndarray): The background image of the entire experiment.
diff_thresh (int): Difference threshold to differentiate between the background and foreground.
A foreground object is detected if the pixel value difference with the background is greater than this threshold.
num_workers (int, optional): The number of workers to use for parallel processing.
If None, the number of workers is determined automatically.
chunk_size (int, optional): The size of each processing chunk.
"""
frames = self._orig_data["frame"].to_numpy().astype(int, copy=False)
wrm_bboxes = self._orig_data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()
mic_bboxes = self._orig_data[["mic_x", "mic_y", "mic_w", "mic_h"]].to_numpy()
errors = np.ones_like(frames, dtype=float)
mask = np.isfinite(wrm_bboxes).all(axis=1)
wrm_bboxes = wrm_bboxes[mask]
mic_bboxes = mic_bboxes[mask]
frames = frames[mask]
num_sections = len(frames) // chunk_size
wrm_bboxes_list = np.array_split(wrm_bboxes, num_sections, axis=0)
mic_bboxes_list = np.array_split(mic_bboxes, num_sections, axis=0)
frames_list = np.array_split(frames, num_sections)
# TODO: add non-multithreaded case whenever num_workers=0
num_workers = adjust_num_workers(len(frames), chunk_size, num_workers)
def calc_error(idx: int) -> np.ndarray:
return ErrorCalculator.calculate_precise(
background=background,
worm_bboxes=wrm_bboxes_list[idx],
mic_bboxes=mic_bboxes_list[idx],
frame_nums=frames_list[idx],
worm_reader=worm_reader,
diff_thresh=diff_thresh,
)
results = concurrent.thread_map(
calc_error,
list(range(len(wrm_bboxes_list))),
max_workers=num_workers,
chunksize=1,
desc="Extracting bboxes",
unit="fr",
leave=False,
)
# set the error in the original data
errors[mask] = np.concatenate(results)
self._orig_data["precise_error"] = errors
# copy relevant error entries into the work data
idx = self.data["frame"].to_numpy(dtype=int, copy=False)
self.data["precise_error"] = errors[idx]
change_unit
def change_unit(
self,
unit: 'str'
)
Changes the unit of time and distance in the data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unit | str | The new unit of time to convert into. Can be "frame" or "sec". If "sec" is chosen, the time will be converted to seconds, and the distance metric is micrometer. If "frame" is chosen, the time will be in frames, and the distance metric is pixels. |
None |
View Source
def change_unit(self, unit: str):
"""
Changes the unit of time and distance in the data.
Args:
unit (str, optional): The new unit of time to convert into. Can be "frame" or "sec".
If "sec" is chosen, the time will be converted to seconds, and the distance metric is micrometer.
If "frame" is chosen, the time will be in frames, and the distance metric is pixels.
"""
assert unit in ["frame", "sec"]
if self._unit == unit:
return
data = self.data
if unit == "sec": # frame -> sec
dist_factor = self.time_config.mm_per_px * 1000
time_factor = self.time_config.ms_per_frame / 1000
if unit == "frame": # sec -> frame
dist_factor = self.time_config.px_per_mm / 1000
time_factor = self.time_config.frames_per_sec
data["time"] *= time_factor
data[["plt_x", "plt_y"]] *= dist_factor
data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]] *= dist_factor
data[["mic_x", "mic_y", "mic_w", "mic_h"]] *= dist_factor
data[["cam_x", "cam_y", "cam_w", "cam_h"]] *= dist_factor
data[["wrm_center_x", "wrm_center_y"]] *= dist_factor
data[["mic_center_x", "mic_center_y"]] *= dist_factor
data[["worm_deviation_x", "worm_deviation_y", "worm_deviation"]] *= dist_factor
data[["wrm_speed_x", "wrm_speed_y", "wrm_speed"]] *= dist_factor / time_factor
self._unit = unit
self.data = data
clean
def clean(
self,
trim_cycles: 'bool' = False,
imaging_only: 'bool' = False,
bounds: 'tuple[float, float, float, float]' = None
) -> 'None'
Clean the data by the provided parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trim_cycles | bool | whether to remove the first and the last cycles from the data. | None |
imaging_only | bool | Flag indicating whether to include only imaging phases in the analysis. | None |
legal_bounds | tuple[float, float, float, float] | The legal bounds for worm movement. | None |
View Source
def clean(
self,
trim_cycles: bool = False,
imaging_only: bool = False,
bounds: tuple[float, float, float, float] = None,
) -> None:
"""
Clean the data by the provided parameters.
Args:
trim_cycles (bool): whether to remove the first and the last cycles from the data.
imaging_only (bool): Flag indicating whether to include only imaging phases in the analysis.
legal_bounds (tuple[float, float, float, float]): The legal bounds for worm movement.
"""
data = self.data
if imaging_only:
mask = data["phase"] == "imaging"
data = data[mask]
if bounds is not None:
has_pred = np.isfinite(data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].to_numpy()).all(axis=1)
mask_wrm = has_pred # if there is a prediction for a frame then look at worm bbox
mask_wrm &= (data["wrm_x"] >= bounds[0]) & (data["wrm_x"] + data["wrm_w"] <= bounds[2])
mask_wrm &= (data["wrm_y"] >= bounds[1]) & (data["wrm_y"] + data["wrm_h"] <= bounds[3])
mask_mic = ~has_pred # if there is no prediction for a frame then look at micro bbox
mask_mic &= (data["mic_x"] >= bounds[0]) & (data["mic_x"] + data["mic_w"] <= bounds[2])
mask_mic &= (data["mic_y"] >= bounds[1]) & (data["mic_y"] + data["mic_h"] <= bounds[3])
data = data[mask_wrm | mask_mic]
if trim_cycles:
mask = data["cycle"] != 0
mask &= data["cycle"] != data["cycle"].max()
data = data[mask]
self.data = data
column_names
def column_names(
self
) -> 'list[str]'
Returns a list of all column names in the analyzed data.
Returns:
Type | Description |
---|---|
list[str] | A list of column names. |
View Source
def column_names(self) -> list[str]:
"""
Returns a list of all column names in the analyzed data.
Returns:
list[str]: A list of column names.
"""
return self.data.columns.to_list()
describe
def describe(
self,
columns: 'list[str]' = None,
num: 'int' = 3,
percentiles: 'list[float]' = None
) -> 'pd.DataFrame'
Generate descriptive statistics of the specified columns in the table containing the data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
columns | list[str] | List of column names to include in the analysis. If None, all columns will be included. | None |
num | int | Number of evenly spaced percentiles to include in the analysis. If percentiles is not None, this parameter is ignored. | None |
percentiles | list[float] | List of specific percentiles to include in the analysis. If None, evenly spaced percentiles will be used. | None |
Returns:
Type | Description |
---|---|
pd.DataFrame | A DataFrame containing the descriptive statistics of the specified columns. |
View Source
def describe(self, columns: list[str] = None, num: int = 3, percentiles: list[float] = None) -> pd.DataFrame:
"""
Generate descriptive statistics of the specified columns in the table containing the data.
Args:
columns (list[str], optional): List of column names to include in the analysis. If None, all columns will be included.
num (int, optional): Number of evenly spaced percentiles to include in the analysis. If percentiles is not None, this parameter is ignored.
percentiles (list[float], optional): List of specific percentiles to include in the analysis. If None, evenly spaced percentiles will be used.
Returns:
pd.DataFrame: A DataFrame containing the descriptive statistics of the specified columns.
"""
if columns is None:
columns = self.column_names()
if percentiles is None:
percentiles = np.linspace(start=0, stop=1.0, num=num + 2)[1:-1]
return self.data[columns].describe(percentiles)
initialize
def initialize(
self,
period: 'int' = 10
)
Initializes the data analyzer.
It's essential to call this function if the class was created from a non-analyzed log data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
period | int | The period for calculating speed in frames. The speed is calculated by measuring the distance between current frame and period frames before. |
None |
View Source
def initialize(self, period: int = 10):
"""
Initializes the data analyzer.
It's essential to call this function if the class was created from a non-analyzed log data.
Args:
period (int): The period for calculating speed in frames.
The speed is calculated by measuring the distance between current frame and period frames before.
"""
data = self._orig_data
data["time"] = data["frame"]
data["cycle_step"] = data["frame"] % self.time_config.cycle_frame_num
data = DataAnalyzer._calc_centers(data)
data = DataAnalyzer._calc_speed(data, period)
data = DataAnalyzer._calc_worm_deviation(data)
data = DataAnalyzer._calc_errors(data)
data = data.round(5)
self._orig_data = data
self.data = self._orig_data.copy()
print_stats
def print_stats(
self
) -> 'None'
Prints various statistics related to the data.
This method calculates and prints the following statistics: - Count of Removed Frames: The number of frames that were removed from the original data. - Total Count of No Pred Frames: The number of frames where the predictions are missing. - Total Num of Cycles: The number of unique cycles in the data. - Non Perfect Predictions: The percentage of predictions that are not perfect.
View Source
def print_stats(self) -> None:
"""
Prints various statistics related to the data.
This method calculates and prints the following statistics:
- Count of Removed Frames: The number of frames that were removed from the original data.
- Total Count of No Pred Frames: The number of frames where the predictions are missing.
- Total Num of Cycles: The number of unique cycles in the data.
- Non Perfect Predictions: The percentage of predictions that are not perfect.
"""
num_removed = len(self._orig_data.index) - len(self.data.index)
print(f"Count of Removed Frames: {num_removed} ({round(100 * num_removed / len(self._orig_data.index), 3)}%)")
no_preds = self.data[["wrm_x", "wrm_y", "wrm_w", "wrm_h"]].isna().any(axis=1).sum()
print(f"Count of No-Pred Frames: {no_preds} ({round(100 * no_preds / len(self.data.index), 3)}%)")
num_cycles = self.data["cycle"].nunique()
print(f"Total Num of Cycles: {num_cycles}")
non_perfect = (self.data["bbox_error"] > 1e-7).sum() / len(self.data.index)
print(f"Non Perfect Predictions: {round(100 * non_perfect, 3)}%")
remove_cycle
def remove_cycle(
self,
cycles: 'int | list[int]'
)
Remove the specified cycles from the data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cycles | int | list[int] | The cycle(s) to remove from the data. |
View Source
def remove_cycle(self, cycles: int | list[int]):
"""
Remove the specified cycles from the data.
Args:
cycles (int | list[int]): The cycle(s) to remove from the data.
"""
if isinstance(cycles, int):
cycles = [cycles]
mask = self.data["cycle"].isin(cycles)
self.data = self.data[~mask]
reset_changes
def reset_changes(
self
)
Reset the data to its original state.
Note, that this method will not reset the unit of time and distance.
View Source
def reset_changes(self):
"""
Reset the data to its original state.
Note, that this method will not reset the unit of time and distance.
"""
self.data = self._orig_data.copy()
self._unit = "frame"
save
def save(
self,
path: 'str'
) -> 'None'
Save the full analyzed data to a csv file.
View Source
def save(self, path: str) -> None:
"""
Save the full analyzed data to a csv file.
"""
self._orig_data.to_csv(path, index=False)