# File: TL_Pysces.py
# Project: ThinLayers
# Author: Johann Rohwer (j.m.rohwer@gmail.com)
# License: BSD-2 clause
# Copyright (c) 2022 Stellenbosch University
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import os
import math
from typing import Union, Optional
from pyenzyme.thinlayers.TL_Base import BaseThinLayer
from pyenzyme.enzymeml.core.exceptions import SpeciesNotFoundError
import io
from contextlib import redirect_stdout
_PYSCES_IMPORT_ERROR = None
_PYSCES_IMPORT_STREAM = io.StringIO()
try:
with redirect_stdout(_PYSCES_IMPORT_STREAM):
import pysces
import lmfit
except ModuleNotFoundError as e:
_PYSCES_IMPORT_ERROR = """
ThinLayerPysces is not available.
To use it, please install the following dependencies:
{}
""".format(
e
)
[docs]class ThinLayerPysces(BaseThinLayer):
def __init__(
self,
path,
model_dir: str,
measurement_ids: Union[str, list] = "all",
init_file: Optional[str] = None,
):
# first time a pysces thinlayer is created, print the import messages
global _PYSCES_IMPORT_STREAM
if _PYSCES_IMPORT_STREAM is not None:
print(_PYSCES_IMPORT_STREAM.getvalue())
_PYSCES_IMPORT_STREAM = None
# check dependencies
if _PYSCES_IMPORT_ERROR:
raise RuntimeError(_PYSCES_IMPORT_ERROR)
super().__init__(path, measurement_ids, init_file)
# Convert model to PSC and get experimental data
self._get_pysces_model(model_dir)
self._get_experimental_data()
# ! Interface
[docs] def optimize(self, method="leastsq"):
"""Performs optimization of the given parameters
Args:
method (str): lmfit optimization algorithm, see https://lmfit.github.io/lmfit-py/fitting.html#choosing-different-fitting-methods
"""
# Initialize the model parameters
parameters = self._initialize_parameters()
# Perform optimization
self.minimizer = lmfit.Minimizer(self._calculate_residual, parameters)
# Set estimated parameters to self
self.parameters = parameters
return self.minimizer.minimize(method=method)
[docs] def write(self):
"""Writes the estimated parameters to a copy of the EnzymeMLDocument"""
nu_enzmldoc = self.enzmldoc.copy(deep=True)
results = self.minimizer.result.params.valuesdict()
for name, value in results.items():
# names contain information about reaction and parameter
# Pattern: rX_name
splitted = name.split("_")
reaction_id = splitted[0]
parameter_id = "_".join(splitted[1::])
# Fetch reaction and parameter
try:
reaction = nu_enzmldoc.getReaction(reaction_id)
except SpeciesNotFoundError:
global_param = nu_enzmldoc.global_parameters.get(name)
global_param.value = value
continue
if reaction.model:
parameter = reaction.model.getParameter(parameter_id)
parameter.value = value
else:
raise TypeError(f"Reaction {reaction_id} has no model to add values to")
return nu_enzmldoc
# ! Helper methods
def _initialize_parameters(self):
"""Adds all parameters to an lmfit Parameters instance.
Raises:
ValueError: Raised when parameter posesses neither an initial value or value attribute
"""
# Initialize lmfit parameters
parameters = lmfit.Parameters()
# Add global parameters
for global_param in self.global_parameters.values():
# need to also catch zero initial values
if global_param.value is not None:
parameters.add(
f"{global_param.name}",
global_param.value,
vary=not global_param.constant,
min=global_param.lower,
max=global_param.upper,
)
elif global_param.initial_value is not None:
parameters.add(
f"{global_param.name}",
global_param.initial_value,
vary=not global_param.constant,
min=global_param.lower,
max=global_param.upper,
)
else: # parameter is
raise ValueError(
f"Neither initial_value nor value given for parameter {global_param.name} in global parameters"
)
# Consistency check
for reaction_id, (model, _) in self.reaction_data.items():
# Apply parameters to lmfit parameters
for parameter in model.parameters:
if parameter.is_global:
continue
# need to also catch zero initial values
if parameter.value is not None:
parameters.add(
f"{reaction_id}_{parameter.name}",
parameter.value,
vary=not parameter.constant,
min=parameter.lower,
max=parameter.upper,
)
elif parameter.initial_value is not None:
parameters.add(
f"{reaction_id}_{parameter.name}",
parameter.initial_value,
vary=not parameter.constant,
min=parameter.lower,
max=parameter.upper,
)
else: # parameter is
raise ValueError(
f"Neither initial_value nor value given for parameter {parameter.name} in reaction {reaction_id}"
)
return parameters
def _get_experimental_data(self):
"""Extract measurement data from the EnzymeML document"""
# Initialize data structure to store experimental data
raw_data, self.inits = [], []
for measurement_data in self.data.values():
# Gather data
data: pd.DataFrame = measurement_data["data"]
init_concs: dict = measurement_data["initConc"]
# Collect raw_data
raw_data.append(data.drop("time", axis=1))
# Collect initial concentrations for simulation
init_mapping = {
"time": data.time,
**{species_id: value for species_id, (value, _) in init_concs.items()},
}
self.inits.append(init_mapping)
# Concatenate and clean all DataFrames
self.experimental_data = pd.concat(raw_data)
self.experimental_data.reset_index(drop=True, inplace=True)
self.cols = list(self.experimental_data.columns)
def _get_pysces_model(self, model_dir: str):
"""Converts an EnzymeMLDocument to a PySCeS model."""
# Set up the PySCeS directory structure
model_dir = os.path.join(model_dir)
os.makedirs(model_dir, exist_ok=True)
# Write the raw XMLString to the dir
sbmlfile_name = f"{self.enzmldoc.name.replace(' ', '_')}.xml"
sbmlfile_path = os.path.join(model_dir, sbmlfile_name)
with open(sbmlfile_path, "w") as file:
file.write(self.sbml_xml)
# First, convert the EnzymeML model to a PySCeS model
pscfile_path = sbmlfile_path + ".psc"
if not (
(os.path.exists(pscfile_path))
and (os.path.getmtime(pscfile_path) > os.path.getmtime(self.filepath))
):
pysces.interface.convertSBML2PSC(sbmlfile_name, sbmldir=model_dir, pscdir=model_dir)
# Finally, load the PSC model
self.model = pysces.model(sbmlfile_name, dir=model_dir)
# work around https://github.com/PySCeS/pysces/issues/79
if (
self.model.__KeyWords__["Species_In_Conc"]
and self.model.__KeyWords__["Output_In_Conc"]
):
for comp in self.model.__compartments__:
self.model.__compartments__[comp]["size"] = 1.0
setattr(self.model, comp, 1.0)
setattr(self.model, f"{comp}_init", 1.0)
def _calculate_residual(self, parameters) -> np.ndarray:
"""Function that will be optimized"""
simulated_data = self._simulate_experiment(parameters)
simulated_data = simulated_data.drop(
simulated_data.columns.difference(self.cols), axis=1
)
return np.array(self.experimental_data - simulated_data)
def _simulate_experiment(self, parameters):
"""Performs simulation based on the PySCeS model"""
self.model.SetQuiet()
self.model.__dict__.update(parameters.valuesdict())
# intialize collection
output = []
# Now iterate over all initial concentrations and simulate accordingly
for init_concs in self.inits:
for species_id, value in init_concs.items():
if type(value) in [float, int] and value == 0:
# Catch initial values with zero and set them ot a small number
value = 1.0e-9
if species_id in self.model.species:
# Add if given in the model
setattr(self.model, f"{species_id}_init", value)
elif species_id == "time":
# Add time to the model
time_values, num_reps = self._get_replicate_info(value)
self.model.sim_time = time_values
else:
setattr(self.model, species_id, value)
# Simulate the experiment and save results
self.model.Simulate(userinit=1)
for i in range(num_reps):
output.append(
[getattr(self.model.sim, species) for species in self.model.species]
)
return pd.DataFrame(np.hstack(output).T, columns=self.model.species)
def _get_replicate_info(self, time):
i = 1
# get index in time series where 2nd replicate starts
while i < len(time) and time[i] > time[i - 1]:
i += 1
num_reps = len(time) // i
if num_reps > 1:
for k in range(1, num_reps):
assert np.allclose(
time[:i], time[k * i : k * i + i]
), "Time points of replicates don't match!"
return (time[:i].values, num_reps)
[docs] def plot_fit(self):
"""Plots the simulation and data for the fitted parameters."""
# check that optimization has been done
assert hasattr(self, "minimizer") and hasattr(
self.minimizer, "result"
), "Please run optimize() first."
measurements = list(self.data.keys())
num = len(measurements)
numcols = 3
numrows = math.ceil(num / numcols)
fig, ax = plt.subplots(nrows=numrows, ncols=numcols, figsize=(9, 2.8 * numrows))
for i in range(num):
if num <= numcols:
theax = ax[i]
else:
theax = ax[i // numcols, i % numcols]
self._plot_measurement(measurements[i], i, ax=theax)
# remove empty axes
if num % numcols != 0:
empty = numcols - num % numcols
for i in range(-empty, 0):
if num <= numcols:
ax[i].set_visible(False)
else:
ax[num // numcols, i].set_visible(False)
fig.tight_layout()
def _plot_measurement(self, meas_id, i=0, ax=None):
"""Plots a single measurement with simulation."""
time_unit = self.enzmldoc.measurement_dict[meas_id].global_time_unit
conc_unit = self.enzmldoc.reactant_dict[self.cols[0]].unit
if not ax:
fig, ax = plt.subplots()
sim = self._simulate_measurement(meas_id, self.minimizer.result.params, self.model)
sim.plot("Time", self.cols, ax=ax, legend=False)
if i == 0:
ax.figure.legend(loc='upper left', bbox_to_anchor=(1.0, 0.96))
ax.set_prop_cycle(None)
self.data[meas_id]["data"].plot("time", self.cols, style="^", legend=False, ax=ax)
ax.set_title(meas_id)
ax.set_xlabel(f"Time ({time_unit})")
ax.set_ylabel(f"Concentration ({conc_unit})")
def _simulate_measurement(self, meas_id, parameters, model):
"""Performs simulation based on the PySCeS model"""
data = self.data[meas_id]["data"]
model.SetQuiet()
model.__dict__.update(parameters.valuesdict())
# Now iterate over all initial concentrations and simulate accordingly
init_concs = self._map_inits(meas_id)
for species_id, value in init_concs.items():
if species_id in model.species:
# Add if given in the model
setattr(model, f"{species_id}_init", value)
else:
setattr(model, species_id, value)
# Simulate the experiment and save results
end_time = data.time.iloc[-1]
# simulate 200 points for smooth curve
model.doSim(end=end_time, points=200)
result = pd.DataFrame(
np.array([getattr(model.sim, species) for species in self.cols]).T,
columns=self.cols,
)
result["Time"] = self.model.sim.Time
return result
def _map_inits(self, meas_id):
init_concs = self.data[meas_id]["initConc"]
return {species_id: value for species_id, (value, _) in init_concs.items()}