Source code for neat.tools.kernelextraction

# -*- coding: utf-8 -*-
#
# kernelextraction.py
#
# This file is part of NEST.
#
# Copyright (C) 2004 The NEST Initiative
#
# NEST is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# NEST is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with NEST.  If not, see <http://www.gnu.org/licenses/>.

import numpy as np
import scipy.linalg as la
from scipy.cluster.vq import kmeans

import math
import copy
from typing import Literal


[docs] class Kernel: """ Implements a kernel as a superposition of exponentials: .. math:: k(t) = \sum_n c_n e^{ - a_n t} Kernels can be added and subtracted, as this class overloads the __add__ and __subtract__ functions. They can be evaluated as a function of time by calling the object with a time array. They can be evaluated in the Fourrier domain with `Kernel.ft` Parameters ---------- kernel: dict, float, neat.Kernel, tuple or list If dict, has the form {'a': `np.array`, 'c': `np.array`}. If float, sets `c` single exponential prefactor and assumes `a` is 1 kHz. If `neat.Kernel`, copies the object. If tuple or list, sets 'a' as first element and 'c' as last element. Attributes ---------- a: np.array of float or complex The exponential coefficients (kHz) c: np.array of float or complex The exponential prefactors k_bar: float The total surface area under the kernel """ def __init__(self, kernel): # set kernel time scales and exponential prefactors if isinstance(kernel, dict): self.a = copy.deepcopy(kernel["a"]) self.c = copy.deepcopy(kernel["c"]) elif isinstance(kernel, float) or isinstance(kernel, int): self.a = np.array([1.0]) self.c = np.array([kernel]).astype(float) elif isinstance(kernel, Kernel): self.a = copy.deepcopy(kernel.a) self.c = copy.deepcopy(kernel.c) else: self.a = copy.deepcopy(kernel[0]) self.c = copy.deepcopy(kernel[1]) if isinstance(self.a, float): self.a = np.array([self.a]) elif not isinstance(self.a, np.ndarray): self.a = np.array(self.a) if isinstance(self.c, float): self.c = np.array([self.c]) elif not isinstance(self.c, np.ndarray): self.c = np.array(self.c) def __getitem__(self, ind): if ind == 0: return self.a elif ind == 1: return self.c elif ind == "a": return self.a elif ind == "c": return self.c elif ind == "alphas": return self.a elif ind == "gammas": return self.c else: raise IndexError("Index should be '0' or '1'") def __call__(self, t_arr): return ( np.dot( np.exp(-t_arr[:, np.newaxis] * self.a[np.newaxis, :]), self.c[:, np.newaxis], ) .flatten() .real ) def __add__(self, kernel): if kernel.a.shape[0] == self.a.shape[0] and np.allclose(kernel.a, self.a): a = copy.copy(self.a) c = kernel.c + self.c else: a = np.concatenate((self.a, kernel.a)) c = np.concatenate((self.c, kernel.c)) return Kernel((a, c)) def __sub__(self, kernel): if kernel.a.shape[0] == self.a.shape[0] and np.allclose(kernel.a, self.a): a = copy.copy(self.a) c = self.c - kernel.c else: a = np.concatenate((self.a, kernel.a)) c = np.concatenate((self.c, -kernel.c)) return Kernel((a, c)) def get_k_bar(self): """ The total surface under the kernel """ return np.sum(self.c / self.a).real def set_k_bar(self, kk): raise AttributeError( "`k_bar` is a read-only attribute, adjust attribute `c` " + "by multiplying with a factor to change `k_bar`" ) k_bar = property(get_k_bar, set_k_bar) def __str__(self, as_timescale=False): if as_timescale: return ( "t = " + np.array2string(1.0 / self.a, precision=4, max_line_width=1000) + "\n" + "c = " + np.array2string(self.c, precision=4, max_line_width=1000) ) else: return ( "a = " + np.array2string(self.a, precision=4, max_line_width=1000) + "\n" + "c = " + np.array2string(self.c, precision=4, max_line_width=1000) ) def __repr__(self): return repr({"a": self.a, "c": self.c})
[docs] def t(self, t_arr): """ Evaluates the kernel in the time domain Parameters ---------- t_arr: `np.array` of `float` the time array in ``ms`` at which the kernel is evaluated Returns ------- np.array of float the temporal kernel """ return self(t_arr)
def diff(self, t_arr=None): """ Computes the time derivative of the kernel. If a time array is provided, returns an array of corresponding kernel values. If nothing is provided, returns a kernel representing the time derivative. Parameters ---------- t_arr: `np.array` (optional) the time array Returns ------- `np.array` or `neat.Kernel` the differentiated kernel """ if t_arr is None: return Kernel( { "a": self.a, "c": -self.a * self.c, } ) else: return ( np.dot( -self.a[np.newaxis, :] * np.exp(-t_arr[:, np.newaxis] * self.a[np.newaxis, :]), self.c[:, np.newaxis], ) .flatten() .real )
[docs] def ft(self, s_arr): """ Evaluates the kernel in the Fourrier domain Parameters ---------- s_arr: np.array of complex The frequencies in ``Hz`` at which the kernel is to be evaluated Returns ------- np.array of complex The Fourrier transform of the kernel """ return np.sum( self.c[:, None] * 1e3 / (self.a[:, None] * 1e3 + s_arr[None, :]), 0 )
def fit_c(self, t_arr, func_arr, w=None): """ Perform a linear least squares fit of the exponential prefactors in the time domain Parameters ---------- t_arr: `np.array` of float the time array in ``ms`` at which the kernel is evaluated k_arr: `np.array` of float the to be fitted kernel array Returns ------- `np.ndarray` of float (`ndim = 2`) The feature matrix """ if w is None: w = np.ones_like(t_arr) A = np.exp(-t_arr[:, None] * self.a[None, :]) self.c = np.linalg.lstsq(w[:, None] * A, w * func_arr, rcond=None)[0]
class Fitter(object): def der(self, x, arr): dx = x[1] - x[0] diffarr = (arr[1:] - arr[0:-1]) / dx return diffarr, x[0:-1] + dx / 2 def derder(self, x, arr): dx = x[1] - x[0] diffarr, _ = self.der(x, arr) diffdiffarr = (diffarr[1:] - diffarr[0:-1]) / dx return diffdiffarr, x[1:-1] def zerocrossing(self, x, arr): arr = copy.copy(arr) inds = np.where(np.diff(np.sign(arr)))[0] return inds, x[inds] def find_nearest(self, array, value): idx = (np.abs(array - value)).argmin() return array[idx], idx class ExpFitter(Fitter): def sumExp(self, x, a, c, flat=True): if flat: return (np.exp(x[:, None] * a[:, None].T).dot(c[:, None])).flatten().real else: return np.exp(x[:, None] * a[:, None].T).dot(c[:, None]).real def PronyExpFit(self, deg, x, y): """ Construct a sum of exponentials fit to a given time-sequence y by using prony's method input: [deg]: int, number of exponentials [x]: numpy array, sequence of regularly spaced points at which y is evaluated [y]: numpy array, sequence output: [a]: numpy array, exponential coefficient [c]: numpy array, exponentail magnitudes [rms]: float, root mean square error of the data """ # deg += 1 # stepsize h = x[1] - x[0] # Build matrix A = la.hankel(y[:-deg], y[-deg - 1 :]) a = -A[:, :deg] b = A[:, deg] # Solve it s = la.lstsq(a, b)[0] # Solve polynomial p = np.flipud(np.hstack((s, 1))) u = np.roots(p) # Only keep roots in unit circle inds = np.where( np.logical_and( (np.abs(u) < 1.0), np.logical_not(np.logical_and(np.imag(u) == 0.0, np.real(u) <= 0.0)), ) )[0] u = u[inds] # Calc exponential factors a = np.log(u) / h # Build power matrix A = np.power( (np.ones((len(y), 1)) * u[:, None].T), np.arange(len(y))[:, None] * np.ones((1, len(inds))), ) # solve it f = la.lstsq(A, y)[0] # calc amplitudes c = f / np.exp(a * x[0]) # build x, approx and calc rms approx = self.sumExp(x, a, c).real rms = np.sqrt(((approx - y) ** 2).sum() / len(y)) return a, c, rms def construct_Hankel_matrices(self, y): ind0 = int(len(y) / 2) # original and shifted hankel matrix H0 = la.hankel(y[0:ind0], y[ind0 - 1 : 2 * ind0 - 1]) H1 = la.hankel(y[1 : ind0 + 1], y[ind0 : 2 * ind0]) return H0, H1 # def Z_McE_ExpFit(self, x, y, deg=2): # # construct the Hankel matrices # H0, H1 = self.construct_Hankel_matrices(y) # # compute the singular value decomposition # U, s, Vh = la.svd(H0) # U_ = U[:, 0:deg] # Vh_ = Vh[0:deg, :] # s_ = s[0:deg] # # compute system matrix # F0 = np.diag(1./np.sqrt(s_)).dot(U_.T) # F1 = Vh_.T.dot(np.diag(1./np.sqrt(s_))) # A = F0.dot(H1.dot(F1)) # # find eigenvalues of system matrix # u, v = la.eig(A) # # system time-scales (inverse) # alphas = np.log(u) / dx # return alphas def fitExp_Z_McE(self, x, y, rtol=1e-2, maxdeg=10): deg = 1 rms = 1.0 # stepsize dx = x[1] - x[0] # construct the Hankel matrices H0, H1 = self.construct_Hankel_matrices(y) # compute the singular value decomposition U, s, Vh = la.svd(H0) # loop over increasing number of exponentials while rms > rtol and deg < maxdeg: U_ = U[:, 0:deg] Vh_ = Vh[0:deg, :] s_ = s[0:deg] # compute system matrix F0 = np.diag(1.0 / np.sqrt(s_)).dot(U_.T) F1 = Vh_.T.dot(np.diag(1.0 / np.sqrt(s_))) A = F0.dot(H1.dot(F1)) # find eigenvalues of system matrix u, v = la.eig(A) # system time-scales (inverse) alphas = np.log(u.real) / dx # solve weights A = np.exp(x[:, None] * alphas[None, :] * dx) gammas = la.lstsq(A, y)[0] # compute rmse approx = self.sumExp(x, alphas, gammas) rms = np.sqrt(((approx - y) ** 2).sum() / len(y)) # increase degree deg += 1 return alphas, gammas, rms def reduceSeries(self, a, c, x, y, rtol=1e-2): """ Reduces the number of exponential terms in a series, till a given tolerance is reached input: [a]: numpy array of exponential timescales [c]: numpy array of exponential magnitudes [x]: numpy array of x-values at which the function is evaluated [y]: numpy array of function values [rtol]: float, relative tolerance given the largest function value output: [alpha]: exponential coefficients [gamma]: magnitudes [rms]: float, root mean square error """ k = 1 rms = 2 * rtol while rms > rtol and k <= len(a): sortind = np.argsort(np.abs(c))[::-1] alpha = a[sortind][0:k] gamma = c[sortind][0:k] approx = self.sumExp(x, alpha, gamma).real rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) k += 1 return alpha, gamma, rms def fitExp(self, x, y, deg=30, rtol=1e-2, surface=False, A=None): a, c, rms = self.PronyExpFit(deg, x, y) alpha, gamma, rms = self.reduceSeries(a, c, x, y, rtol=rtol) if surface: dx = x[1] - x[0] if A == None: A = dx * np.sum(y) Afit = np.sum( gamma * (np.exp(alpha * x[-1]) - np.exp(alpha * x[0])) / alpha ) gamma = gamma * A / Afit approx = self.sumExp(x, alpha, gamma).real rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) return alpha, gamma, rms class fExpFitter(Fitter): def sumFExp(self, s, alphas, gammas): return np.sum(self.fexps(s, alphas, gammas), 0) def fexps(self, s, alphas, gammas): return gammas[:, None] / (alphas[:, None] + s[None, :]) def trialFunFit(self, s, arr, alphas, pairs=None): # construct matrix for extended fitting problem A = np.concatenate( ( 1.0 / (s[:, None] + alphas[None, :]), arr[:, None] / (s[:, None] + alphas[None, :]), ), axis=1, ) # find auxiliary residues c = la.lstsq(A, arr)[0][-len(alphas) :] # find zeros of fitted auxiliary function H = np.diag(alphas) - np.dot( np.ones((len(alphas), 1), dtype=complex), c[None, :] ) alphanew = np.linalg.eig(H)[0] # find real residues Anew = 1.0 / (s[:, None] + alphanew[None, :]) cnew = la.lstsq(Anew, arr)[0] return alphanew, cnew, None def trialFunFit_constrained(self, s, arr, alphas, pairs, zerostart=False): deg = len(alphas) carr = np.concatenate((arr.real, arr.imag)) # construct matrix for extended fitting problem A = np.concatenate( ( 1.0 / (s[:, None] + alphas[None, :]), arr[:, None] / (s[:, None] + alphas[None, :]), ), axis=1, ) # implement the constraint pairsnew = np.concatenate((pairs, pairs)) for i, p in enumerate(pairsnew): if p: x1 = A[:, i] + A[:, i + 1] x2 = 1j * (A[:, i] - A[:, i + 1]) A[:, i] = x1 A[:, i + 1] = x2 A = np.concatenate((A.real, A.imag), axis=0) # find auxiliary residues c = la.lstsq(A, carr)[0][-len(alphas) :] # find zeros of fitted auxiliary function a = np.diag(alphas) b = np.ones(deg) # implement similarity transform for i, p in enumerate(pairs): if p: a[i : i + 2, i : i + 2] = np.array( [ [alphas[i].real, alphas[i].imag], [-alphas[i].imag, alphas[i].real], ] ) b[i : i + 2] = np.array([2, 0]) H = a.real - np.dot(b[:, None], c[None, :]) alphanew = np.linalg.eig(H)[0] inds = np.argsort(alphanew) alphanew = alphanew[inds] # indicates where pairs of complex conjugate poles occur auxarr = np.abs( (np.abs(alphanew[:-1]) - np.abs(alphanew[1:])) / np.abs(alphanew[:-1]) ) auxarr2 = np.abs(alphas.imag) > 1e-15 pairs = np.logical_and( np.concatenate((auxarr < 1e-15, np.zeros(1, dtype=bool))), auxarr2 ) # find residues Anew = 1.0 / (s[:, None] + alphanew[None, :]) for i, p in enumerate(pairs): if p: x1 = Anew[:, i] + Anew[:, i + 1] x2 = 1j * (Anew[:, i] - Anew[:, i + 1]) Anew[:, i] = x1 Anew[:, i + 1] = x2 Anew = np.concatenate((Anew.real, Anew.imag), axis=0) if zerostart: # enforce K(t=0)=0 constraint row1 = np.ones(2 * deg) for i, p in enumerate(pairs): if p: row1[i + 1] = 0 Anew = np.concatenate((np.ones((1, deg), dtype=complex), Anew), axis=0) carr = np.concatenate((np.zeros(1, dtype=complex), carr)) cnew = la.lstsq(Anew, carr)[0] cnew = np.array(cnew, dtype=complex) # recast cnew to complex values for i, p in enumerate(pairs): if p: cnew[i : i + 2] = np.array( [cnew[i] + 1j * cnew[i + 1], cnew[i] - 1j * cnew[i + 1]] ) return alphanew, cnew, pairs def fit_residues(self, s, arr, alphas, pairs): carr = np.concatenate((arr.real, arr.imag)) A = 1.0 / (s[:, None] + alphas[None, :]) for i, p in enumerate(pairs): if p: x1 = A[:, i] + A[:, i + 1] x2 = 1j * (A[:, i] - A[:, i + 1]) A[:, i] = x1 A[:, i + 1] = x2 A = np.concatenate((A.real, A.imag), axis=0) cnew = la.lstsq(A, carr)[0] cnew = np.array(cnew, dtype=complex) # recast cnew to complex values for i, p in enumerate(pairs): if p: cnew[i : i + 2] = np.array( [cnew[i] + 1j * cnew[i + 1], cnew[i] - 1j * cnew[i + 1]] ) return cnew def trialFunFit_constrained_2d(self, s, arr2d, alphas, pairs): print(">>> multifun fit test v2 <<<") deg = len(alphas) # construct f array arr1d = np.array([], dtype=complex) for ind, arr in enumerate(arr2d): arr1d = np.concatenate((arr1d, arr)) # construct matrix A ns = len(s) ncols = (len(arr2d) + 1) * deg nrows = len(arr1d) A = np.zeros((nrows, ncols), dtype=complex) for ind, fis in enumerate(arr1d): indA = int(ind / ns) A[ind, deg * indA : deg * (indA + 1)] = 1.0 / (s[ind % ns] + alphas) # try: # A[ind,deg*indA:deg*(indA+1)] = 1./(s[ind%ns] + alphas) # except ValueError: # print indA # print deg*indA # print deg*(indA+1) # print ncols A[ind, -deg:] = -fis / (s[ind % ns] + alphas) # implement the constraint for j in range(len(arr2d) + 1): for i, p in enumerate(pairs): if p: x1 = A[:, j * deg + i] + A[:, j * deg + i + 1] x2 = 1j * (A[:, j * deg + i] - A[:, j * deg + i + 1]) A[:, j * deg + i] = x1 A[:, j * deg + i + 1] = x2 A = np.concatenate((A.real, A.imag), axis=0) arr1d = np.concatenate((arr1d.real, arr1d.imag)) # find auxiliary residues c = la.lstsq(A, arr1d)[0][-len(alphas) :] print("cnew: ", c) # find zeros of fitted auxiliary function a = np.diag(alphas) b = np.ones(deg) # implement similarity transform for i, p in enumerate(pairs): if p: a[i : i + 2, i : i + 2] = np.array( [ [alphas[i].real, alphas[i].imag], [-alphas[i].imag, alphas[i].real], ] ) b[i : i + 2] = np.array([2, 0]) # compute zeros of sum sigmafit H = a.real - np.dot(b[:, None], c[None, :]) print("H: ", H) alphanew = np.linalg.eig(H)[0] print("alphanew: ", alphanew) inds = np.argsort(alphanew) alphanew = alphanew[inds] # indicates where pairs of complex conjugate poles occur auxarr = np.abs( (np.abs(alphanew[:-1]) - np.abs(alphanew[1:])) / np.abs(alphanew[:-1]) ) auxarr2 = np.abs(alphanew.imag) > 1e-15 # np.abs(alphas.imag) > 1e-15 pairs = np.logical_and( np.concatenate((auxarr < 1e-15, np.zeros(1, dtype=bool))), auxarr2 ) # find residues # compute matrix for residue calculation Anew = 1.0 / (s[:, None] + alphanew[None, :]) for i, p in enumerate(pairs): if p: x1 = Anew[:, i] + Anew[:, i + 1] x2 = 1j * (Anew[:, i] - Anew[:, i + 1]) Anew[:, i] = x1 Anew[:, i + 1] = x2 Anew = np.concatenate((Anew.real, Anew.imag), axis=0) # compute residues c2dnew = np.zeros((arr2d.shape[0], deg), dtype=complex) for ind, arr in enumerate(arr2d): carr = np.concatenate((arr.real, arr.imag)) cnew = la.lstsq(Anew, carr)[0] cnew = np.array(cnew, dtype=complex) # recast cnew to complex values for i, p in enumerate(pairs): if p: cnew[i : i + 2] = np.array( [cnew[i] + 1j * cnew[i + 1], cnew[i] - 1j * cnew[i + 1]] ) c2dnew[ind, :] = cnew print("cnew: ", c2dnew) return alphanew, c2dnew, pairs def reduceSeries(self, s, y, a, c, pairs=None, rtol=1e-2, pprint=False): """ reduce the series of exponentials after the fitting """ k = 1 rms = 1.0 # ensure stability of approximation inds = np.where(a.real > 0.0)[0] a = a[inds] c = c[inds] if pairs is not None: pairs = pairs[inds] # construct indices for ranking the exponentials pairs_alltrue = copy.copy(pairs) for i, p in enumerate(pairs): if p: pairs_alltrue[i + 1] = True magnitudes = np.zeros(a.shape) for i in range(len(pairs_alltrue)): if pairs_alltrue[i]: c_ = c[i].real c__ = c[i].real a_ = a[i].real a__ = a[i].real magnitudes[i] = (c_ * a_ + c__ * a__) / (a_**2 + a__**2) else: magnitudes[i] = c[i].real / a[i].real sortind = np.argsort(np.abs(magnitudes))[::-1] anew = copy.copy(a[sortind]) alphas = anew cnew = copy.copy(c[sortind]) gammas = cnew # look for pairs to be sure they are correct auxarr = np.abs( (np.abs(alphas[:-1]) - np.abs(alphas[1:])) / np.abs(alphas[:-1]) ) auxarr2 = np.abs(alphas.imag) > 1e-15 pairs = np.logical_and( np.concatenate((auxarr < 1e-15, np.zeros(1, dtype=bool))), auxarr2 ) npairs = copy.copy(pairs) approx = self.sumFExp(s, alphas, gammas) while rms > rtol and k < len(a) + 1: if (pairs is not None) and pairs[k - 1]: k += 1 alphas = anew[0:k] gammas = cnew[0:k] auxarr = np.abs( (np.abs(alphas[:-1]) - np.abs(alphas[1:])) / np.abs(alphas[:-1]) ) auxarr2 = np.abs(alphas.imag) > 1e-15 npairs = np.logical_and( np.concatenate((auxarr < 1e-15, np.zeros(1, dtype=bool))), auxarr2 ) approx = self.sumFExp(s, alphas, gammas) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) k += 1 if pprint: pairinds = copy.copy(npairs) inds = np.where(npairs)[0] for i in inds: pairinds[i + 1] = True inds = np.where(np.logical_not(pairinds))[0] if len(inds) > 0: if np.max(np.abs(alphas[inds].imag)) > 1e-6: print("!!! Warning: invalid pairs !!!") print("original alphas: ", anew) print("original gammas: ", cnew) print("original pairs: ", pairs) print("new alphas: ", alphas) print("new gammas: ", gammas) print("new pairs: ", npairs) return alphas, gammas, rms, approx, npairs def _find_start_nodes(self, s, deg, realpoles, initpoles): if not isinstance(initpoles, str): trialpoles = initpoles.astype(complex) elif initpoles == "lin": trialpoles = np.linspace(s[int(len(s) / 2.0) + 1].imag, s[-1].imag, deg) elif initpoles == "log10": trialpoles = np.logspace(1, np.log10(s[-1].imag), num=deg, base=10) elif initpoles == "log": trialpoles = np.logspace(1, np.log(s[-1].imag), num=deg, base=math.e) elif initpoles == "random": trialpoles = s[-1].imag * np.random.rand(deg) else: raise Exception("initpoles invalid") if realpoles: pairs = np.zeros(trialpoles.shape, dtype=bool) else: trialpoles = np.array( [[tp + 1j * tp, tp - 1j * tp] for tp in trialpoles] ).flatten() pairs = np.array([[True, False] for _ in range(deg)]).flatten() return trialpoles, pairs def _run_fit( self, s, y, trialpoles, pairs, rtol, maxiter, constrained, zerostart, pole_flip=True, pprint=True, ): """ performs iterations of the actual fitting process """ k = 0 rms = rtol + 1.0 l = 0 m = 0 alist = [] clist = [] rmslist = [] pairslist = [] trialpoles_orig = copy.copy(trialpoles) pairs_orig = copy.copy(pairs) if constrained: while rms > rtol and k < maxiter: a, c, pairs = self.trialFunFit_constrained( s, y, trialpoles, pairs, zerostart=zerostart ) approx = self.sumFExp(s, a, c) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) # if unstable poles, make sure to run again # if np.min(a) < 0.: # rms = rtol + 1. # if m < 10.: # if pole_flip: ind = np.where(a < 0.0)[0] # where poles are unstable if len(ind) > 0: a[ind] *= -1.0 c = self.fit_residues(s, y, a, pairs) approx = self.sumFExp(s, a, c) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) # if rms < rtol: # alist.append(copy.deepcopy(a)); clist.append(copy.deepcopy(c)); rmslist.append(rms); pairslist.append(pairs) # else: # ind = np.where(a > 0.)[0] # where poles are stable # newpole, newpair = self._find_start_nodes(s, len(a)-len(ind), True, 'random') # trialpoles = np.concatenate((a[ind], newpole)) # pairs = np.concatenate((pairs[ind], newpair)) # else: # trialpoles, pairs = self._find_start_nodes(s, len(trialpoles_orig), True, 'random') # m = 0 # l += 1; m += 1 # else: alist.append(copy.deepcopy(a)) clist.append(copy.deepcopy(c)) rmslist.append(rms) pairslist.append(pairs) trialpoles = copy.copy(a) k += 1 if pprint and l > 5: print("Often found unstable poles (" + str(l) + " times)") return alist, clist, rmslist, pairslist else: while rms > rtol and k < maxiter: a, c, _ = self.trialFunFit(s, y, trialpoles, zerostart) approx = self.sumFExp(s, a, c) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) trialpoles = a k += 1 return alist, clist, rmslist, None def _run_fit_vector(self, s, ys, trialpoles, pairs, rtol, maxiter): # eps = 2. # k = 0; rms = 1. # rms_ = rms # alist = []; clist = []; rmslist = []; pairslist = [] # while rms > rtol and k < maxiter: # a, c2d, pairs = self.trialFunFit_constrained_2d(s, ys, trialpoles, pairs) # rms = 0. # for ind, y in enumerate(ys): # approx = self.sumFExp(s, a, c2d[ind]) # rms += np.sqrt(((np.abs(approx-y) / np.max(np.abs(y)))**2).sum() / len(y)) # alist.append(copy.deepcopy(a)); clist.append(copy.deepcopy(c2d)); rmslist.append(rms); pairslist.append(pairs) # # randomize poles a bit # skip = False # tp = copy.deepcopy(a) # if (rms_ - rms) / rms_ < eps: # for i, p in enumerate(pairs): # if not skip: # if p: # x1 = 0.1 * tp[i].real; x2 = 0.1 * np.abs(tp[i].imag) # r1 = x1 * (2. * np.random.rand() - 1); r2 = x2 * (2. * np.random.rand() - 1) # tp[i:i+2] = np.array([tp[i] + r1 + 1j*r2, tp[i+1] + r1 - 1j*r2]) # skip = True # else: # x = 0.1 * tp[i] # r = x * (2. * np.random.rand() - 1) # tp[i] += r # skip = False # trialpoles = tp # k += 1 # rms_ = rms eps = 2.0 k = 0 rms = 1.0 rms_ = rms alist = [] clist = [] rmslist = [] pairslist = [] while rms > rtol and k < maxiter: a2d = np.zeros((len(ys), len(trialpoles)), dtype=complex) c2d = np.zeros((len(ys), len(trialpoles)), dtype=complex) pairs2d = np.zeros((len(ys), len(trialpoles)), dtype=bool) for ind, y in enumerate(ys): a2d[ind], c2d[ind], pairs2d[ind] = self.trialFunFit_constrained( s, y, trialpoles, pairs ) # put complex conjugates with positive part first for i, p in enumerate(pairs2d[ind]): if p: if a2d[ind, i] < 0: a2d[ind, i] = a2d[ind, i].real - 1j * a2d[ind, i].imag a2d[ind, i + 1] = ( a2d[ind, i + 1].real - 1j * a2d[ind, i + 1].imag ) c2d[ind, i] = c2d[ind, i].real - 1j * c2d[ind, i].imag c2d[ind, i + 1] = ( c2d[ind, i + 1].real - 1j * c2d[ind, i + 1].imag ) a, pairs = self._Kmeans(a2d, pairs2d) c2d = np.zeros((len(ys), len(a)), dtype=complex) for ind, y in enumerate(ys): c2d[ind] = self.fit_residues(s, y, a, pairs) approx = self.sumFExp(s, a, c2d[ind]) rms += np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) alist.append(copy.deepcopy(a)) clist.append(copy.deepcopy(c2d)) rmslist.append(rms) pairslist.append(pairs) # randomize poles a bit skip = False tp = copy.deepcopy(a) if (rms_ - rms) / rms_ < eps: for i, p in enumerate(pairs): if not skip: if p: x1 = 0.1 * tp[i].real x2 = 0.1 * np.abs(tp[i].imag) r1 = x1 * (2.0 * np.random.rand() - 1) r2 = x2 * (2.0 * np.random.rand() - 1) tp[i : i + 2] = np.array( [tp[i] + r1 + 1j * r2, tp[i + 1] + r1 - 1j * r2] ) skip = True else: x = 0.1 * tp[i] r = x * (2.0 * np.random.rand() - 1) tp[i] += r skip = False trialpoles = tp k += 1 return alist, clist, rmslist, pairslist def _Kmeans( self, a2d, pairs2d ): # do the kmeans algorithm to make sure all nodes are the same a1d = np.array([], dtype=complex) for i, a in enumerate(a2d): # determine the coefficients not to take into account in the algorithm paux = np.concatenate((np.array([False]), pairs2d[i, :-1])) inds = np.where(np.logical_not(paux))[0] a1d = np.concatenate((a1d, a[inds])) adata = np.concatenate((a1d.real[:, None], a1d.imag[:, None]), 1) astart = np.concatenate( (a2d[-1].real[inds][:, None], a2d[-1].imag[inds][:, None]), 1 ) a = kmeans(adata, astart)[0] # check for complex conjugates anew = [] pairsnew = [] for alpha in a: if np.abs(alpha[1]) > 1e-9: anew.append(alpha[0] + 1j * alpha[1]) anew.append(alpha[0] - 1j * alpha[1]) pairsnew.append(True) pairsnew.append(False) else: anew.append(alpha[0] + 1j * 0.0) pairsnew.append(False) # a = a[:,0] + 1j* a[:,1] # look for pairs to be sure they are correct # auxarr = np.abs((np.abs(a[:-1]) - np.abs(a[1:])) / np.abs(a[:-1])) # auxarr2 = np.abs(a.imag) > 1e-15 # pairs = np.logical_and(np.concatenate((auxarr < 1e-15, np.zeros(1, dtype=bool))), auxarr2) return np.array(anew), np.array(pairsnew) def reduceNumExp(self, s, y, a, c, pairs, lim=0.1, pprint=True, pplot=True): """ pools the short timescale exponentials """ # find inds of exponentials that have to be taken together inds = np.where(np.abs(a.real) > (1e3 / lim))[0] # the other indices stay the same inds_no = np.where(np.abs(a.real) <= (1e3 / lim))[0] anew = a[inds_no] cnew = c[inds_no] pairsnew = pairs[inds_no] if len(inds) > 1: amin = np.min(a[inds]) EF = ExpFitter() if pplot == True: import matplotlib.pyplot as pl y_f_full = self.sumFExp(s, a, c) y_f_part = self.sumFExp(s, a[inds], c[inds]) pl.figure("reduceNumExp problem") pl.plot(s.imag, y_f_full.real, "r") pl.plot(s.imag, y_f_part.real, "b") pl.plot(s.imag, y_f_full.imag, "r--") pl.plot(s.imag, y_f_part.imag, "b--") pl.show() # multiple step approach t = np.linspace(0.0, 5.0 / amin.real, 1000) y_t = EF.sumExp(t, -a[inds], c[inds]) y_t_full = EF.sumExp(t, -a, c) A_t = -np.sum( c[inds] * (np.exp(-a[inds] * t[-1]) - np.exp(-a[inds] * t[0])) / a[inds] ) y_t_lim = EF.sumExp(np.array([lim * 1e-3]), -a[inds], c[inds]) y_t_lim_full = EF.sumExp(np.array([lim * 1e-3]), -a, c) # ~ print 'full sum at 1ms: ', y_t_lim_full # ~ print 'partial sum at 1ms: ', y_t_lim # ~ print 'max full sum: ', np.max(y_t_full[1:]) # fit first outside of first timestep if necessary if amin.real < (2e4 / lim) and np.abs( y_t_lim_full - y_t_lim ) > 0.001 * np.max(y_t_full[1:]): t_out = np.linspace(lim * 1e-3, 5.0 / amin.real, 1000.0) y_out = EF.sumExp(t_out, -a[inds], c[inds]) A_out = -np.sum( c[inds] * (np.exp(-a[inds] * t_out[-1]) - np.exp(-a[inds] * t_out[0])) / a[inds] ) try: # if the maximum of the to be grouped exponentials is past lim, # we use two exponentials, otherwise one # ~ else: A, C, _ = EF.fitExp( t_out, y_out, deg=1, rtol=0.0001, surface=True, A=A_out ) A = -A.real C = C.real Ptemp = [False] except ValueError: A = np.array([amin.real]) C = A_out / ((np.exp(-A * t_out[-1]) - np.exp(-A * t_out[0])) / A) Ptemp = [False] # check if we need to fit inside first timestep t_in = np.linspace(0.0, lim * 1e-3, 100) y_in = EF.sumExp(t_in, -a[inds], c[inds]) - EF.sumExp(t_in, -A, C) A_in_full = -np.sum( c[inds] * (np.exp(-a[inds] * t_in[-1]) - np.exp(-a[inds] * t_in[0])) / a[inds] ) A_in = -np.sum(C * (np.exp(-A * t_in[-1]) - np.exp(-A * t_in[0])) / A) if np.abs(A_in - A_in_full) < 0.01 * np.abs(A_in_full): # we don't need to fit an extra exponential, # but just rescale surfaces a bit A_tot = np.sum(c[inds] / a[inds]) A_part = np.sum(C / A) C = C * A_tot / A_part P = np.array(Ptemp, dtype=bool) else: # we need to fit an extra exponential t = np.linspace(0.0, 3.0 / amin.real, 1000.0) A_t = np.sum(c[inds] / a[inds]) A_exp1 = np.sum(C / A) A2 = np.array([1e4 / lim], dtype=complex) C2 = (A_t - A_exp1) * A2 P = np.array(Ptemp + [False], dtype=bool) A = np.concatenate((A, A2)) C = np.concatenate((C, C2)) else: # we can just fit inside the first timestep # construct new exponential naively A = np.array([amin.real], dtype=complex) C = np.sum(c[inds] / a[inds]) * A P = np.array([False], dtype=bool) # concatenate the arrays anew = np.concatenate((anew, A)) cnew = np.concatenate((cnew, C)) pairsnew = np.concatenate((pairsnew, P)) if pprint or pplot: t = np.linspace(0.0, 0.050, 100000) A_original = -np.sum(c * (np.exp(-a * t[-1]) - np.exp(-a * t[0])) / a) A_new = -np.sum( cnew * (np.exp(-anew * t[-1]) - np.exp(-anew * t[0])) / anew ) if np.abs(A_original - A_new) > 1e-12 or np.isnan(A_new.real): print("!!! Warning: surfaces under kernels not equal !!!") print("oringal surface: ", A_original) print("new surface: ", A_new) print("all a's: ", a) print("all gamma's: ", c) print("all pairs: ", pairs) print("tbg a's: ", a[inds]) print("tbg gamma's: ", c[inds]) print("tbg pairs: ", pairs[inds]) print("ntbg a's: ", a[inds_no]) print("ntbg gamma's: ", c[inds_no]) print("ntbg pairs: ", pairs[inds_no]) print("new a's: ", anew) print("new c's: ", cnew) print("new pairss: ", pairsnew) if pplot and (np.abs(A_original - A_new) > 1e-12 or np.isnan(A_new.real)): # ~ if pplot: t = np.linspace(0.0, 0.050, 100000) dt = t[1] - t[0] ef = ExpFitter() se_ = ef.sumExp(t, -a[inds], c[inds]) e_ = ef.sumExp(t, -A, C) se = ef.sumExp(t, -a, c) e = ef.sumExp(t, -anew, cnew) print( "integral original reduced: ", -np.sum( c[inds] * (np.exp(-a[inds] * t[-1]) - np.exp(-a[inds] * t[0])) / a[inds] ), ) print( "integral fit reduced: ", -np.sum(C * (np.exp(-A * t[-1]) - np.exp(-A * t[0])) / A), ) print("final a's :", anew) print("new a's :", A) import matplotlib.pyplot as pl pl.figure("reduce_exp problem") pl.plot(t * 1000, se, "r", label="original kernel") pl.plot(t * 1000, e, "b", label="new kernel") pl.plot(t * 1000, se_, "r--", label="exps to be reduced") pl.plot(t * 1000, e_, "b--", label="to be reduced exp") pl.legend(loc=0) pl.show() # new approximation and rmse approx = self.sumFExp(s, anew, cnew) rms = np.sqrt(((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y)) return anew, cnew, rms, approx, pairsnew def fitFExp_increment( self, s, y, rtol=1e-2, maxiter=20, maxiter_step=3, realpoles=True, constrained=True, zerostart=False, pprint=True, ): # find the start nodes trialpoles, pairs = self._find_start_nodes(s, 1, realpoles, "log10") for k in range(maxiter): alist, clist, rmslist, pairslist = self._run_fit( s, copy.copy(y), trialpoles, pairs, rtol, maxiter_step, constrained, zerostart, pole_flip=True, pprint=pprint, ) indmin = np.argmin(np.array(rmslist)) alpha = alist[indmin] gamma = clist[indmin] rms = rmslist[indmin] pairs = pairslist[indmin] if rms < rtol: break else: if realpoles: # alphanew = [s[-1].imag * np.random.rand()] alphanew = [np.random.choice(1.0 / s.imag)] pairsnew = [False] else: areal = s[-1].imag * np.random.rand() aimag = s[-1].imag * np.random.rand() alphanew = [areal + 1j * aimag, areal - 1j * aimag] pairsnew = [True, False] trialpoles = np.array(alpha.tolist() + alphanew) # print trialpoles pairs = np.array(pairs.tolist() + pairsnew) rmsfinal = rms if pprint and rmsfinal > rtol: print("Target accuracy was not reached") return alpha, gamma, pairs, rmsfinal def fitFExp( self, s, y, deg=20, rtol=1e-2, maxiter=5, lim=None, realpoles=True, initpoles="lin", zerostart=False, constrained=True, reduce_numexp=False, return_real=False, lower_limit=None, ): """ Fits a function in fourrierspace by a series of fourrier transformed exponentials. input: -args [s]: numpy array of frequencies (imaginary) at which value function is evaluated [y]: numpy array of complex function values -kwargs [deg]: int, number of exponential terms used (real number is dubbeled if realpoles=False) [rtol]: float, relative toleranse after which iterations stop [maxiter]: int, maximum number of iterations [lim]: float, smallest timescale to take into account [ms], if not None, the algorithm fits the slowest timescale first, then the next slowest, etc. !!Use only for decaying transfer functions!! [realpoles]: boolean, use real starting poles if true, use complex conjugate poles if false [initpoles]: numpy array or str 'lin' for linearly spaced initial poles, 'log10' and 'log' for logarithmically spaced poles. If a numpy array is given, those values will be the initial polez (Hz). [zerostart]: boolean, constrain the function to be 0 at t=0 if true [constrained]: fix the poles to be complex conjugate pairs [reduce_numexp]: boolean, pool short time scale exponentials together if true [return_real]: boolean, fix the output to only contain real poles if true [lower_limit]: float, if not None, lower limit on the exponential prefactors output: [alpha]: numpy array of (complex) timescales of exponentials [gamma]: numpy array of complex magnitudes of exponentials [pairs]: boolean array that indicates True at every index where a complex conjugate pair occurs [rms]: float, root mean square error """ trialpoles, pairs = self._find_start_nodes(s, deg, realpoles, initpoles) if lim != None: a_s = [] c_s = [] pair_s = [] y_decr = copy.copy(y) deg_decr = deg keep_going = True count = 0 while keep_going: alist, clist, rmslist, pairslist = self._run_fit( s, y_decr, trialpoles, pairs, rtol, maxiter, constrained, zerostart ) indmin = np.argmin(np.array(rmslist)) anew, cnew, rmsnew, approx, pairsnew = self.reduceSeries( s, y_decr, alist[indmin], clist[indmin], pairs=pairslist[indmin], rtol=rtol, ) if count == 0: # save parameters for later purposes asave = copy.copy(anew) csave = copy.copy(cnew) rmssave = rmsnew pairssave = pairsnew surface_original = np.sum(cnew / anew) ind = [] # take the longest timescale out ind.append(np.argmin(anew.real)) if pairsnew[ind]: ind.append(ind[0] + 1) a_tba = anew[ind] c_tba = cnew[ind] pair_tba = pairsnew[ind] surface = np.sum(cnew / anew) y_old = copy.copy(y_decr) y_decr = self.sumFExp(s, anew, cnew) - self.sumFExp( s, anew[ind], cnew[ind] ) # ~ deg_decr -= len(ind) trialpoles, pairs = self._find_start_nodes(s, deg_decr, True, initpoles) # stop if timescale is small enough if anew[ind][0] > 1e3 / (0.2 * lim): if len(ind) == 1: c_tba = surface * a_tba elif len(ind) == 2: c_tba[0] = ( surface * (a_tba[0].real ** 2 + a_tba[0].imag ** 2) / (2.0 * a_tba[0].real) ) c_tba[1] = c_tba[0] else: raise ValueError("invalid array length") keep_going = False # stop if rmse is small enough approx = self.sumFExp(s, a_tba, c_tba) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) if rms < rtol: keep_going = False # append new parameters to lists a_s += a_tba.tolist() c_s += c_tba.tolist() pair_s += pair_tba.tolist() # stop if to many parameters if count >= 9.0: keep_going = False count += 1 # for returning alpha = np.array(a_s, dtype=complex) gamma = np.array(c_s, dtype=complex) pairs = np.array(pair_s, dtype=bool) approx = self.sumFExp(s, alpha, gamma) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) # check whether it was better to go with the first parameters if len(asave) < len(alpha) and rmssave < rtol: alpha = asave gamma = csave pairs = pairssave approx = self.sumFExp(s, alpha, gamma) rms = np.sqrt( ((np.abs(approx - y) / np.max(np.abs(y))) ** 2).sum() / len(y) ) surface_after = np.sum(gamma / alpha) if np.abs(surface_original - surface_after) > rtol * surface_original: print("surface original: ", surface_original) print("surface after: ", surface_after) if np.min(alpha.real) < 0.0: print("!!!owowow!!!") else: alist, clist, rmslist, pairslist = self._run_fit( s, y, trialpoles, pairs, rtol, maxiter, constrained, zerostart ) indmin = np.argmin(np.array(rmslist)) alpha, gamma, rms, approx, pairs = self.reduceSeries( s, y, alist[indmin], clist[indmin], pairs=pairslist[indmin], rtol=rtol ) if reduce_numexp: alpha, gamma, rms, approx, pairs = self.reduceNumExp( s, y, alpha, gamma, pairs, pplot=False ) if return_real: inds = np.where(np.roll(pairs, 1))[0] alpha_real = np.delete(alpha.real, inds) alpha = alpha_real + 0j pairs = np.zeros(alpha.size, dtype=bool) gamma = self.fit_residues(s, y, alpha, pairs) if lower_limit: inds = np.where(alpha.real < lower_limit)[0] alpha = np.delete(alpha, inds) pairs = np.delete(pairs, inds) gamma = self.fit_residues(s, y, alpha, pairs) return alpha, gamma, pairs, rms def fitFExp_vector( self, s, ys, deg=20, rtol=1e-2, maxiter=5, extra_startpoles=[], extra_startpoles_pairs=[], realpoles=True, initpoles="lin", reduce_series=False, ): """ Fit multiple data-arrays in Fourrier-domain simultaneously with a shared set of nodes input: [s]: numpy array of complex number, frequencies of data [ys]: numpy ndarray of complex numbers, rows are different data-arrays [deg]: int, the starting number of nodes [rtol]: float, the relative tolercance at which to stop [maxiter]: int, the maximal number of iterations after which to stop when rtol is not reached [extra_startpoles]: numpy array of complex number, additional initial poles [extra_startpoles_pairs]: numpy bolean array, indicates complex conjugate pairs associated with the extra initial poles [realpoles]: boolean, if True the starting poles are real, if false the starting poles are complex conjugates (and then the real degree is 2*deg) [initpoles]: string specifying how the initial poles are distributed, choices are 'lin', 'log' and 'log10' [reduce_series]: boolean, whether to delete expontentials of small influence after the fitting output: [alpha]: complex numpy array of exponential coefficients [gamma]: 2d complex numpy array, each row contains the residues corresponding to the respective data arrays [pairs]: boolean numpy array, indicates where a pair of complex conjugate exponentials occurs [rms]: float, aggregated root mean square error """ trialpoles, pairs = self._find_start_nodes(s, deg, realpoles, initpoles) if len(extra_startpoles) > 0: trialpoles = np.concatenate((trialpoles, extra_startpoles)) pairs = np.concatenate((pairs, extra_startpoles_pairs)) alist, clist, rmslist, pairslist = self._run_fit_vector( s, ys, trialpoles, pairs, rtol, maxiter ) indmin = np.argmin(np.array(rmslist)) if reduce_series: # reduce the number of exponentials for each function separately alpha_arr = np.array([], dtype=complex) rms = 0.0 for ind, c in enumerate(clist[indmin]): alpha, gamma, rms_ind, approx, pair = self.reduceSeries( s, ys[ind], alist[indmin], c, pairs=pairslist[indmin], rtol=rtol ) rms += rms_ind alpha_arr = np.concatenate((alpha_arr, alpha)) alpha_arr = np.unique(alpha_arr) # search positions of common alphas asortind = np.argsort(alist[indmin]) alphapos = np.searchsorted(alist[indmin][asortind], alpha_arr) inds = asortind[alphapos] return ( alist[indmin][inds], clist[indmin][:, inds], pairslist[indmin][inds], rms, ) else: return alist[indmin], clist[indmin], pairslist[indmin], rmslist[indmin] def create_logspace_freqarray(fmax=7, base=10, num=200): a = np.logspace(1, fmax, num=num, base=base) b = np.linspace(-base, base, num=num // 2 + 1) # b = np.linspace(-base, base, num=num/2+(num/2)%2)[:-1] return 1j * np.concatenate((-a[::-1], b[1:-1], a))
[docs] class FourierQuadrature(object): """ Performs an accurate Fourrier transform on functions evaluated at a given array of temporal grid points Parameters ---------- tarr: `np.array` of floats, the time points (ms) at which the function is evaluated, have to be regularly spaced fmax: float, optional (default ``7.``) the maximum value to which the logarithm is evaluated to get the maximum evaluation frequency base: float, optional (defaul ``10``) the base of the logarithm used to generated the logspace num: int, even, optional (default ``200``) Number of points. the eventual number of points in frequency space is (2+1/2)*num Attributes ---------- s: np.array of complex The frequencies at which input arrays in the Fourrier domain are supposed to be evaluated t: np.array of real The time array at which input arrays in the time domain are supposed to be evaluated ind_0s: int Index of the zero frequency component in `self.s` """ def __init__(self, tarr, fmax=7, base=10, num=200): assert num % 2 == 0 # create the frequency points at which to evaluate the transform self.s = create_logspace_freqarray(fmax=fmax, base=base, num=num) self.t = tarr self.ind_0s = len(self.s) // 2 # create the quadrature matrix self._setQuad() self._setQuadInv() def _setQuad(self): s = self.s t = self.t N = len(t) dt = (t[1] - t[0]) * 1e-3 c = np.zeros((len(s), N), dtype=complex) Nr = np.arange(1, N - 1)[np.newaxis, :] sc = s[:, np.newaxis] mask_arr = np.abs(sc) > 1e-12 # first frequency integral np.divide(np.exp(-sc * dt) - 1.0, -sc * dt, out=c[:, 0:1], where=mask_arr) np.divide(-1.0 + c[:, 0:1], -sc, out=c[:, 0:1], where=mask_arr) c[np.where(np.logical_not(mask_arr))[0], 0] = dt / 2.0 # middle integrals np.divide( np.exp(-sc * dt * (Nr + 1)) - 2.0 * np.exp(-sc * dt * Nr) + np.exp(-sc * dt * (Nr - 1)), sc**2 * dt, out=c[:, 1:-1], where=mask_arr, ) c[np.where(np.logical_not(mask_arr))[0], 1:-1] = dt # last frequency integral np.divide( np.exp(-sc * dt * N) - np.exp(-sc * dt * (N - 1)), -sc * dt, out=c[:, N - 1 : N], where=mask_arr, ) np.divide( np.exp(-sc * dt * N) - c[:, N - 1 : N], -sc, out=c[:, N - 1 : N], where=mask_arr, ) c[np.where(np.logical_not(mask_arr))[0], N - 1] = dt / 2.0 self.c = c def _setQuadInv(self): t = self.t[:, np.newaxis] * 1e-3 s = self.s[np.newaxis, :] ic = np.zeros((len(self.t), len(self.s)), dtype=complex) mask_arr = np.abs(t) > 1e-12 # compute integrals I1 = np.divide( np.exp(s[:, 1:] * t) - np.exp(s[:, :-1] * t), 1j * t, out=np.zeros_like(ic[:, :-1]), where=mask_arr, ) I1[np.where(np.logical_not(mask_arr))[0], :] = (s[:, 1:] - s[:, :-1]) / 1j I2_ = np.divide( np.exp(s[:, 1:] * t) - np.exp(s[:, :-1] * t), 1j * t, out=np.zeros_like(ic[:, :-1]), where=mask_arr, ) # I2_[0,:] = s[:,1:] - s[:,:-1] / 1j I2 = np.divide( (s[:, 1:] - s[:, :-1]).imag * np.exp(s[:, 1:] * t) - I2_, 1j * t, out=np.zeros_like(ic[:, :-1]), where=mask_arr, ) # I2[0:1,:] = s[:,1:] * (s[:,1:] - s[:,:-1]) # compute matrix elements ic[:, 0] = I1[:, 0] - I2[:, 0] / (s[:, 1] - s[:, 0]).imag ic[:, 1:-1] = ( I1[:, 1:] - I2[:, 1:] / (s[:, 2:] - s[:, 1:-1]).imag + I2[:, :-1] / (s[:, 1:-1] - s[:, :-2]).imag ) ic[:, -1] = I2[:, -1] / (s[:, -1] - s[:, -2]).imag self.ic = ic / (2.0 * np.pi)
[docs] def __call__(self, arr): """ Evaluate the Fourrier transform of `arr` Parameters ---------- arr: `np.array` Should have the same length as `self.t` Returns ------- s: `np.array` the frequency points at which the Fourrier transform is evaluated (in Hz) farr: `np.array` the Fourrier transform of `arr` """ farr = np.dot(self.c, arr) return self.s, farr
[docs] def ft(self, arr): """ Evaluate the Fourrier transform of `arr` Parameters ---------- arr: `np.array` Should have the same length as `self.t` Returns ------- s: `np.array` the frequency points at which the Fourrier transform is evaluated (in Hz) farr: `np.array` the Fourrier transform of `arr` """ return self(arr)
[docs] def ft_inv(self, arr): """ Evaluate the inverse Fourrier transform of `arr` Parameters ---------- arr: `np.array` Should have the same length as `self.s` Returns ------- t: `np.array` the time points at which the inverse Fourrier transform is evaluated (in ms) tarr: `np.array` the Fourrier transform of `arr` """ tarr = np.dot(self.ic, arr) return self.t, tarr
class expExtractor(object): def __call__(self, N=1, recalc=False, atol=5e-2, pprint=True): """ Fit a partial fraction decomposition to the obtained kernel in the frequency domain input: N (int): the number of exponenstials to use recalc (bool): whether to force a refit if a fit with the given N already exists atol (float): the tolerance, if the RMSE of the fit is higher and pprint is True a warning is printed pprint (bool): whether to print the warning output: kfit (dict): {'a': array of the exponental factors (1/tau), 'c': array of the coefficients, 'p': idicators whether or not a given exponential is part of a complex conjugate pair} """ if N not in self.kfit or recalc: FEF = fExpFitter() ak, ck, pk, rms = FEF.fitFExp( self.s_f, self.k_f, rtol=1e-2, deg=N, maxiter=20, initpoles="log10", realpoles=True, zerostart=False, constrained=True, reduce_numexp=False, ) ak *= 1e-3 # ck *= 1e-3 # convert units to ms # also try time domain approach if N < 10: EF = ExpFitter() if "k_t" not in self.__dict__: k_t = EF.sumExp(self.tarr, -self.kfit[30]["a"], self.kfit[30]["c"]) else: k_t = self.k_t ak_, ck_, rms_ = EF.PronyExpFit(N, self.tarr, k_t) ak_ = -ak_ pk_ = np.zeros(ak_.shape, dtype=bool) # go for the approach with the lowest error rms = np.sqrt( ((k_t - EF.sumExp(self.tarr, -ak, ck)) ** 2).sum() / len(k_t) ) if rms > rms_: ak = ak_ ck = ck_ pk = pk_ rms = rms_ if rms > atol and pprint: print("No sane fit achieved for N=" + str(N)) print("RMSE:", rms) self.kfit[N] = {"a": ak, "c": ck, "p": pk} return self.kfit[N] def fit_vector(self, N, atol=5e-2, pprint=True, store=True): FEF = fExpFitter() ak, ck, pk, rms = FEF.fitFExp( self.s_f, self.k_f, rtol=1e-2, deg=N, maxiter=20, initpoles="log10", realpoles=True, zerostart=False, constrained=True, reduce_numexp=False, ) ak *= 1e-3 # ck *= 1e-3 # convert units to ms if rms > atol and pprint: print("No sane fit achieved for N=" + str(N)) print("RMSE:", rms) res = {"a": ak, "c": ck, "p": pk} if store: self.kfit[N] = res return res def fit_prony(self, N, atol=5e-2, pprint=True, store=True): EF = ExpFitter() if "k_t" not in self.__dict__: k_t = EF.sumExp(self.tarr, -self.kfit[30]["a"], self.kfit[30]["c"]) else: k_t = self.k_t ak, ck, rms = EF.PronyExpFit(N, self.tarr, k_t) ak *= -1.0 # ck *= 1e-3 pk = np.zeros(ak.shape, dtype=bool) if rms > atol and pprint: print("No sane fit achieved for N=" + str(N)) print("RMSE:", rms) res = {"a": ak, "c": ck, "p": pk} if store: self.kfit[N] = res return res def k_freq(self, N): self(N) FEF = fExpFitter() return FEF.sumFExp(self.s_f, self.kfit[N]["a"] * 1e3, self.kfit[N]["c"]) def k_time(self, N): self(N) EF = ExpFitter() return EF.sumExp(self.tarr, -self.kfit[N]["a"], self.kfit[N]["c"]) class IzExtractor(expExtractor): def __init__(self, Iz, yarr): self.tarr = Iz FT = FourierQuadrature(self.tarr) self.s_f, self.k_f = FT(yarr) # for storing kernel fit results self.kfit = {} # initial fit self(30) class simpleExpExtractor(expExtractor): def __init__(self, arr, tarr): dt = tarr[1] - tarr[0] self.tarr = tarr self.k_t = arr # create a smoothing window if arr hasn't gone to zero yet if arr[-1] > 1e-9: vwindow = np.cos((np.pi / 2.0) * (self.tarr / self.tarr[-1])) else: vwindow = np.ones(self.tarr.shape) # compute Fourrier transform FT = FourierQuadrature( self.tarr, fmax=np.log10(1.0 / (dt * 1e-3)), base=10.0, num=200 ) self.s_f, self.k_f = FT(self.k_t) # for storing kernel fit results self.kfit = {} # initial fit self(30) class kernelExtractor(expExtractor): """ This class computes the rescale kernel between the synapse in the dendrite and the synapse in the soma. input: vdend (numpy 1d array): the somatic voltage recording with the synapse in the dendrites vsoma (numpy 1d array): the somatic voltage recording with the synapse at the some vbase (numpy 1d array): the somatic voltage recording without the synapse time (numpy 1d array): the time array associated with the recordings attributes: vdend: somatic voltage from dendritic spike arrival onwards vsoma: somatic voltage from somatic spike arrival onwards vbase: somatic voltage without spike arrival vdend_: vdend - vbase vsoma_: vsoma - vbase s_f: frequency array vdend_f: fourier transform of vdend_ vdend_f: fourier transform of vdend_ k_f: kernel in the frequency domain """ def __init__(self, vdend, vsoma, vbase, time): # time step dt = time[1] - time[0] # voltage deviations from baseline vdend_ = vdend - vbase vsoma_ = vsoma - vbase # find when the spike arrives istart = np.where(vdend_ > 1e-9)[0][0] - 1.0 # time array PSP self.tarr = time[istart:] - istart * dt # full potentials after spike arrival self.vdend = vdend[istart:] self.vsoma = vsoma[istart:] self.vbase = vbase[istart:] # create a smoothing window if PSP hasn't gone to zero yet if vdend_[-1] > 1e-9: vwindow = np.cos((np.pi / 2.0) * (self.tarr / self.tarr[-1])) else: vwindow = np.ones(self.tarr.shape) # PSPs self.vdend_ = vdend_[istart:] * vwindow self.vsoma_ = vsoma_[istart:] * vwindow # fourrier transform FT = FourierQuadrature( self.tarr, fmax=np.log10(1.0 / (dt * 1e-3)), base=10.0, num=200 ) self.s_f, self.vdend_f = FT(self.vdend_) self.s_f, self.vsoma_f = FT(self.vsoma_) # compute the kernel self.k_f = self.vdend_f / self.vsoma_f # for storing kernel fit results self.kfit = {} # initial fit self(30) class expNReducer(expExtractor): def __init__(self, alphas, gammas): FEF = fExpFitter() # creat time array self.tarr = np.linspace(0.0, 100.0, 1e3) # create frequency array self.s_f = create_logspace_freqarray(fmax=7, base=10, num=200) # kernel in freqency domain self.k_f = FEF.sumFExp(self.s_f, -alphas * 1e3, gammas * 1e3) # for storing kernel fit results self.kfit = {} # initial fit self(30) class FourierTools: def __init__(self, t_inp): self._set_freq_and_time_arrays(t_inp) def _set_default_freq_array_vector_fit(self): # reasonable parameters to construct frequency array dt = 0.1 * 1e-3 # s N = 2**12 smax = np.pi / dt # Hz ds = np.pi / (N * dt) # Hz # frequency array for vector fitting self.freqs_vfit = np.arange(-smax, smax, ds) * 1j # Hz def _set_default_freq_array_quadrature(self, t_inp): self.t = t_inp if isinstance(t_inp, FourierQuadrature): self.t = t_inp.t # reasonable parameters for FourierQuadrature self.fq = FourierQuadrature(self.t, fmax=7.0, base=10.0, num=200) def _set_freq_and_time_arrays(self, t_inp): self._set_default_freq_array_vector_fit() self._set_default_freq_array_quadrature(t_inp) self._slice_vfit = np.s_[: len(self.freqs_vfit)] self._slice_quad = np.s_[len(self.freqs_vfit) :] self.freqs = np.concatenate((self.freqs_vfit, self.fq.s)) def inverse_fourier( self, func_vals_f, method: Literal["", "exp fit", "quadrature"] = "", compute_time_derivative=True, ): if method not in ["", "exp fit", "quadrature"]: raise IOError("Method should be empty string, 'exp fit' or 'quadrature'") # compute in time domain, method depends on ratio between spectral # power in zero frequency vs max frequency component # typically, this will mean exponential fit is chosen for input # impedances and explicit quadrature for transfer impedances f_arr = func_vals_f[self._slice_quad] criterion_eval = np.abs(f_arr[-1]) / np.abs(f_arr[self.fq.ind_0s]) criterion = criterion_eval <= 1e-3 if criterion_eval > 1e-10: # if there is substantial spectral power in the max frequency # components, we smooth the function with a squared cosine window # to reduce oscillations window = np.cos(np.pi * self.fq.s.imag / (2.0 * np.abs(self.fq.s[-1]))) ** 2 else: window = np.ones_like(self.fq.s) # compute kernel through quadrature method func_vals_t = ( self.fq.ft_inv(window * func_vals_f[self._slice_quad])[1].real * 1e-3 ) # MOhm/s -> MOhm/ms if compute_time_derivative: # compute differentiated kernel dfunc_vals_t_dt = ( self.fq.ft_inv(self.fq.s * window * func_vals_f[self._slice_quad])[ 1 ].real * 1e-6 ) # MOhm/s^2 -> MOhm/ms^2 # when the criterion is satified, or if the default method is # overridden to 'quadrature', we always return the the quadrature result if (method == "" and criterion) or method == "quadrature": if compute_time_derivative: return func_vals_t, dfunc_vals_t_dt else: return func_vals_t # this code will only be reached when `method` is "exp_fit", or when # `method` is "" but the criterion is not satisfied # we set a custom set of initial poles for the vector fit algorithm # NOTE: not used at the moment, have to figure out why # initpoles = np.concatenate(( # np.linspace(.5, 10**1.3, 40)[:-1], # np.logspace( # 1.3, np.log10(self.freqs[self._slice_vfit][-1].imag), # num=40, base=10, # ) # )) # compute kernel as superposition of exponentials in the frequency domain f_exp_fitter = fExpFitter() alpha, gamma, pairs, rms = f_exp_fitter.fitFExp( self.freqs[self._slice_vfit], func_vals_f[self._slice_vfit], deg=40, initpoles="log10", realpoles=True, zerostart=False, constrained=True, reduce_numexp=False, ) zk = Kernel({"a": alpha * 1e-3, "c": gamma * 1e-3}) if compute_time_derivative: dzk_dt = zk.diff() # linear fit of c in the time domain to the quadrature-computed kernels # can improve accuracy # NOTE: not used at the moment, have to figure out why # w = np.concatenate( # (self.fq.t[self.fq.t < 1.], np.ones_like(self.fq.t[self.fq.t >= 1.])) # ) # zk.fit_c(self.fq.t, func_vals_t, w=w) # evaluate kernel in the time domain func_vals_t = zk(self.fq.t) if compute_time_derivative: # linear fit of c in the time domain to the quadrature-computed kernels # can improve accuracy # NOTE: not used at the moment, have to figure out why # dzk_dt.fit_c(self.fq.t, dfunc_vals_t_dt, w=w) # compute differentiated kernel dfunc_vals_t_dt = zk.diff(self.fq.t) return func_vals_t, dfunc_vals_t_dt else: return func_vals_t