Source code for XRStools.xrs_scans

#!/usr/bin/python
# Filename: xrs_scans.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

#/*##########################################################################
#
# The XRStools software package for XRS spectroscopy
#
# Copyright (c) 2013-2014 European Synchrotron Radiation Facility
#
# This file is part of the XRStools XRS spectroscopy package developed at
# the ESRF by the DEC and Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "Christoph J. Sahle - ESRF"
__contact__ = "christoph.sahle@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"


import numpy as np
from . import xrs_utilities, math_functions, xrs_fileIO, xrs_rois
import h5py
import os

from itertools import groupby
from scipy import optimize
from scipy.interpolate import Rbf
from matplotlib import pylab as plt
from scipy import ndimage

# try to import the fast PyMCA parsers
try:
    import PyMca5.PyMcaIO.EdfFile as EdfIO
    import PyMca5.PyMcaIO.specfilewrapper as SpecIO
    use_PyMca = True
except:
    use_PyMca = False

print( ' >>>>>>>>  use_PyMca ' , use_PyMca)

__metaclass__ = type # new style classes

[docs]class Scan: """ **Scan** Class for manipulating scan data from the Hydra and Fourc spectrometers. All relevant information from the SPEC- and EDF-files are organized in instances of this class. Attributes: edf_mats (np.array): Array containing all 2D images that belong to the scan. number (int): Scan number as in the SPEC file. scan_type (string): Keyword, used later to group scans (add similar scans, etc.). energy (np.array): Array containing the energy axis (1D). monitor (np.array): Array containing the monitor signal (1D). counters (dictionary): Counters with assiciated data from the SPEC file. motors (dictionary): Motor positions as found in the SPEC file header. eloss (np.array): Array of the energy loss scale. signals (np.array): Array of signals extracted from the ROIs. errors (np.array): Array of Poisson errors. cenom (list): Center of mass for each ROI (used if scan is an elastic line scan). signals_pw (list): Pixelwise (PW) data, one array of PW data per ROI. errors_pw (list): Pixelwise (PW) Poisson errors, one array of PW errors per ROI. cenom_pw (list): Center of mass for each pixel. signals_pw_interp (list): Interpolated signals for each pixel. Ein (float): Incident energy, used if energy2 is scanned. raw_signals (dict): Dictionary of raw data (summed, line-by-line, pixel-by-pixel). raw_errors (dict): Dictionary of raw erros (summed, line-by-line, pixel-by-pixel). """ normalizationDict={} def __init__( self ): self.edfmats = np.array([]) self.scan_number= None self.scan_type = None self.energy = np.array([]) self.monitor = np.array([]) self.counters = {} self.motors = {} self.eloss = np.array([]) self.signals = np.array([]) self.errors = np.array([]) self.cenom = [] self.signals_pw = [] self.errors_pw = [] self.cenom_pw = [] self.signals_pw_interp = [] self.Ein = None self.scan_motor = None self.raw_signals = {} self.raw_errors = {} self.__signals_normalized__ = False
[docs] def load( self, path, SPECfname, EDFprefix, EDFname, EDFpostfix, scan_number, \ direct=False, roi_obj=None, scaling=None, scan_type='generic', \ en_column=None, moni_column='izero', method='sum', comp_factor=None,\ rot_angles=None, clean_edf_stack=False, cenom_dict=None, storeInsets=False ): """ **load** Parse SPEC-file and EDF-files for loading a scan. Note: If 'direct' is 'True' all EDF-files will be deleted after application of the ROIs. Args: path (str): Absolute path to directory in which the SPEC-file is located. SPECfname (str): SPEC-file name. EDFprefix (str): Prefix for the EDF-files. EDFpostfix (str): Postfix for the EDF-files. scan_number (int): Scan number of the scan to be loaded. direct (boolean): If 'True', all EDF-files will be deleted after loading the scan. method (str): Keyword specifying the selected choice of data treatment: can be 'sum', 'row', 'pixel', or 'column'. Default is 'sum'. """ print( 'Parsing EDF- and SPEC-files of scan No. %s.' % scan_number) self.scan_number = scan_number # load SPEC-file fname = os.path.join(path , SPECfname) if use_PyMca == True: spec_data, self.motors, self.counters, lables = xrs_fileIO.PyMcaSpecRead_my(fname,scan_number) else: spec_data, self.motors, self.counters = xrs_fileIO.SpecRead(fname,scan_number) # if isinstance(self.motors,dict): # print(" CONVERSIONE 1 ") # self.motors = list( self.motors.items() ) # assign values, energy only if en_column is specified, first counter in SPECfile otherwise if en_column: self.energy = np.array(self.counters[en_column.lower()]) self.scan_motor = en_column.lower() else: self.energy = np.array(self.counters[lables[0].lower()]) self.scan_motor = lables[0].lower() # normalization the_moni = np.array(self.counters[moni_column.lower()]) if moni_column.lower() == 'izero': the_moni *= np.mean(self.counters['seconds']) if moni_column not in self.normalizationDict: self.normalizationDict[moni_column.lower()] = np.mean(the_moni)/ np.mean(self.counters['seconds']) self.monitor = the_moni/self.normalizationDict[moni_column.lower()] # assign the scan type self.scan_type = scan_type # load EDF-files if use_PyMca == True: self.edfmats = xrs_fileIO.ReadEdfImages_PyMca( self.counters['ccdno'], path, EDFprefix, EDFname, EDFpostfix) else: self.edfmats = xrs_fileIO.ReadEdfImages_my( self.counters['ccdno'], path, EDFprefix, EDFname, EDFpostfix ) # remove totally saturated images if clean_edf_stack: self.edfmats = edf_cleaner(self.edfmats, 1.0e8) # apply ROIs (if applicable) if direct and isinstance( roi_obj, xrs_rois.roi_object ): self.get_raw_signals( roi_obj, method=method, scaling=scaling, rot_angles=rot_angles , storeInsets = storeInsets) if cenom_dict: if method == 'row': self.get_signals( method='row', comp_factor=comp_factor, scaling=scaling ) elif method == 'sum': self.get_signals( method='sum', scaling=scaling ) elif method == 'pixel': self.get_signals( method='pixel', cenom_dict=cenom_dict ) elif method == 'pixel2': self.get_signals( method='pixel2', cenom_dict=cenom_dict )
#elif method == 'column': # self.get_signals( method='column', cenom_dict=cenom_dict )
[docs] def assign( self, edf_arrays, scan_number, energy_scale, monitor_signal, counters, \ motor_positions, specfile_data, scan_type='generic' ): """ **assign** Method to group together existing data from a scan (for backward compatibility). Args: edf_arrays (np.array): Array of all 2D images that belong to the scan. scan_number (int): Number under which this scan can be found in the SPEC file. energy_scale (np.array): Array of the energy axis. monitor_signal (np.array): Array of the monitor signal. counters (dictionary): Counters with associated data from the SPEC file. motor_positions (list): Motor positions as found in the SPEC file header. specfile_data (np.array): Matrix with all data as found in the SPEC file. scantype (str): Keyword, used later to group scans (add similar scans, etc.). """ self.edfmats = np.array(edf_arrays) self.scan_number= scan_number self.energy = np.array(energy_scale) self.monitor = np.array(monitor_signal) self.counters = counters self.motors = motor_positions # if isinstance(self.motors,dict): # print(" CONVERSIONE 1 in assign") # self.motors = list( self.motors.items() ) self.spec_data = specfile_data self.scan_type = scan_type
[docs] def save_hdf5( self, fname ): """ **save_hdf5** Save a scan in an HDF5 file. Note: HDF5 files are strange for overwriting files. Args: fname (str): Path and filename for the HDF5 file. """ if isinstance(fname, h5py.Group): f=fname else: f = h5py.File(fname, "w") h5_md = f.require_group("motorDict") for mn, mv in self.motors.items(): h5_md[mn] = mv h5_md = f.require_group("counters") for mn, mv in self.counters.items(): h5_md[mn] = mv for attr in ['edfmats', 'scan_number', 'energy', 'monitor', 'scan_type','signals','errors']: f[attr] = eval( 'self.' + attr ) for key in self.used_masks.keys(): hgroup = f.require_group(key) pos, mask = self.used_masks[key] hgroup["mask"] = mask hgroup["mask_pos"] = pos if hasattr( self, "insets") : hgroup["insets"] = self.insets [key] if not isinstance(fname, h5py.Group): f.close()
[docs] def load_hdf5( self, fname ): """ **load_hdf5** Load a scan from an HDF5 file. Args: fname (str): Filename of the HDF5 file. """ doclose = False if isinstance(fname, h5py.Group): f=fname else: doclose = True f = h5py.File(fname, "r") for attr in ['edfmats', 'scan_number', 'energy', 'scan_type', 'signals','errors']: setattr( self , attr ,f[attr][()]) self.motors = {} if "motorDict" in f: subGroup = f["motorDict"] for mname in subGroup: self.motors[ mname] = subGroup[mname][()] self.counters = {} if "counters" in f: subGroup = f["counters"] for mname in subGroup: self.counters[mname] = subGroup[mname][()] self.used_masks = {} for key in f: if str(key)[:3] == "ROI" : mygroup = f[key] self.used_masks[key] = [mygroup["mask_pos"][:].tolist(), np.array(mygroup["mask"][:])] if doclose : f.close()
[docs] def get_raw_signals( self, roi_obj, method='sum', scaling=None, rot_angles=None, storeInsets=False ): """ **get_raw_signals** Applies given ROIs to EDF-images. Applies the provided ROIs to the EDF-images in the specified mannar: summing, line-by-line, or pixel-by-pixel. Depending on the choice, the resulting data array is 2D (sum), 3D (line-by-line), or 4D (pixel-by-pixel). The scanned direction is always the first dimension of the resulting data matrix. Args: roi_obj (instance): Instance of the 'XRStools.xrs_rois.roi_object' class defining the ROIs. method (string): Keyword specifying the selected choice of data treatment: can be 'sum', 'row', 'pixel', or 'column'. Default is 'sum'. scaling (np.array): Array of float-type scaling factors (factor for each ROI). Returns: None if there are not EDF-files to apply the ROIs to. """ self.used_masks={} if storeInsets: self.insets={} for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape self.insets[key] = np.zeros( [ len(self.edfmats), S[0] , S[1] ], self.edfmats.dtype ) for ii in range(len(self.edfmats)): for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape inset = (slice( pos[0], pos[0]+(S[0]) ), slice( pos[1], pos[1]+(S[1]) )) self.used_masks[key] = ( pos, M ) if storeInsets: self.insets[key][ii] = self.edfmats[ii, inset[0], inset[1]] * (M/M.max()) # sum if method == 'sum': print('selected method is \'sum\': summing up pixels from each ROI.') signals = {} # dict (one entry per ROI, with vector (one entry per energy point)) errors = {} # sqrt of the sum of counts for key, (pos, M) in roi_obj.red_rois.items(): signals[key] = np.zeros((len(self.energy))) errors[key] = np.zeros((len(self.energy))) for ii in range(len(self.edfmats)): ind = 0 for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape inset = (slice( pos[0], pos[0]+(S[0]) ), slice( pos[1], pos[1]+(S[1]) )) signals[key][ii] = np.sum( self.edfmats[ii, inset[0], inset[1]] * (M/M.max())) errors[key][ii] = np.sqrt(signals[key][ii]) signals[key][ii] /= self.monitor[ii] errors[key][ii] /= self.monitor[ii] ind += 1 # row elif method == 'row': print('selected method is \'row\': summing over non-dispersive direction for each ROI.') signals = {} # dict (one entry per ROI, with 2D matrix (energy vs row)) errors = {} # sqrt of the sum of counts rot_angles_dict = {} # put possible rotation angles into dict counter = 0 for key, (pos, M) in sorted(roi_obj.red_rois.items() ): signals[key] = np.zeros((len(self.energy), M.shape[0])) errors[key] = np.zeros((len(self.energy), M.shape[0])) if rot_angles is not None: rot_angles_dict[key] = rot_angles[counter] counter += 1 for ii in range(len(self.edfmats)): ind = 0 for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape inset = (slice( pos[0], pos[0]+(S[0]) ), slice( pos[1], pos[1]+(S[1]) )) # rotate raw_signals and raw_errors if method is 'line' and angles are provided if rot_angles: if len(roi_obj.red_rois) is not len(rot_angles): print('Only %d rotation angles provided for %d ROIs. Will end here.'%(len(rot_angles), len(self.raw_signals))) return # rotate images before summation orig_slice = self.edfmats[ii, inset[0], inset[1]] * (M/M.max()) slice_for_sum = ndimage.interpolation.rotate( orig_slice, rot_angles_dict[key],\ reshape=False, order=0, mode='constant' ) else: slice_for_sum = self.edfmats[ii, inset[0], inset[1]] * (M/M.max()) signals[key][ii,:] = np.sum( slice_for_sum , axis=1) errors[key][ii,:] = np.sqrt(signals[key][ii,:]) signals[key][ii,:] /= self.monitor[ii] errors[key][ii,:] /= self.monitor[ii] ind += 1 # column elif method == 'column': print('selected method is \'column\': summing over dispersive direction for each ROI.') signals = {} # dict (one entry per ROI, with 2D matrix (energy vs row)) errors = {} # sqrt of the sum of counts for key, (pos, M) in roi_obj.red_rois.items(): signals[key] = np.zeros((len(self.energy), M.shape[1])) errors[key] = np.zeros((len(self.energy), M.shape[1])) for ii in range(len(self.edfmats)): ind = 0 for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape inset = (slice( pos[0], pos[0]+(S[0]) ), slice( pos[1], pos[1]+(S[1]) )) signals[key][ii,:] = np.sum( self.edfmats[ii, inset[0], inset[1]] * (M/M.max()), axis=0) errors[key][ii,:] = np.sqrt( signals[key][ii,:] ) signals[key][ii,:] /= self.monitor[ii] errors[key][ii,:] /= self.monitor[ii] ind += 1 # pixel elif method == 'pixel' or method == 'pixel2': print('selected method is \'pixel\': returning ROI pixel-by-pixel.') signals = {} # dict (one entry per ROI, with 3D matrix (energy vs pixel_0 vs pixel_1)) errors = {} # sqrt of the sum of counts for key, (pos, M) in roi_obj.red_rois.items(): signals[key] = np.zeros((len(self.energy), M.shape[0], M.shape[1])) errors[key] = np.zeros((len(self.energy), M.shape[0], M.shape[1])) for ii in range(len(self.edfmats)): ind = 0 for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape inset = (slice( pos[0], pos[0]+(S[0]) ), slice( pos[1], pos[1]+(S[1]) )) signals[key][ii,:,:] = self.edfmats[ii, inset[0], inset[1]] * (M/M.max()) errors[key][ii,:,:] = np.sqrt( signals[key][ii,:,:] ) signals[key][ii,:,:] /= self.monitor[ii] errors[key][ii,:,:] /= self.monitor[ii] ind += 1 # unknown method else: print( 'Unknown integration method. Use either \'sum\', \'row\', or \'pixel\'.' ) return # set normalization self.__signals_normalized__ = True # assign for key,ii in zip(sorted(roi_obj.red_rois, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(roi_obj.red_rois))): if np.any(scaling): self.raw_signals[key] = signals[key] * scaling[ii] self.raw_errors[key] = errors[key] * scaling[ii] else: self.raw_signals[key] = signals[key] self.raw_errors[key] = errors[key] # delete EDF-files (these are obsolete after pixel-by-pixel integration) print( 'Deleting ++ EDF-files of scan No. %s.' % self.scan_number ) self.edfmats = np.array([])
[docs] def get_signals( self, method='sum', cenom_dict=None, comp_factor=None, scaling=None, PIXEL_SIZE=0.055 ): """ **get_signals** Turns pixel-, column- or sum-wise raw-data into data. Takes the raw-data after application of the ROIs and applies the chosen compensation scheme. Args: method (str): Keyword specifying the selected choice of data treatment: can be 'sum', 'row', or 'pixel'. Default is 'sum'. cenom_dict (dict): Dictionary with one entry per ROI holding information about the center of mass of the according elastic line. comp_factor (float): Factor used in the RIXS-style line-by-line compensation. scaling (np.array): Array of float-type scaling factors (factor for each ROI). """ if method == 'sum': self.signals = np.zeros(( len(self.energy), len(self.raw_signals) )) self.errors = np.zeros(( len(self.energy), len(self.raw_signals) )) for key,ii in zip(sorted(self.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.raw_signals))): if not self.__signals_normalized__: self.signals[:,ii] = self.raw_signals[key]/self.monitor self.errors[:,ii] = self.raw_errors[key]/self.monitor else: self.signals[:,ii] = self.raw_signals[key] self.errors[:,ii] = self.raw_errors[key] elif method == 'pixel': # assign sign of compensation if self.scan_motor == 'energy': comp_direction = 1.0 elif self.scan_motor == 'anal energy': comp_direction = -1.0 else: print('Unknown energy motor, will break here.') return self.signals = np.zeros(( len(self.energy), len(self.raw_signals) )) self.errors = np.zeros(( len(self.energy), len(self.raw_signals) )) for key,ii in zip(sorted(self.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.raw_signals))): S = cenom_dict[key].shape master_cenom = cenom_dict[key][int(S[0]/2.),int(S[1]/2.)] for dim1 in range(self.raw_signals[key].shape[1]): for dim2 in range(self.raw_signals[key].shape[2]): x = self.energy + ( comp_direction*cenom_dict[key][dim1,dim2] - comp_direction*master_cenom ) y = self.raw_signals[key][:,dim1,dim2] rbfi = Rbf( x, y, function='linear' ) self.signals[:,ii] += rbfi(self.energy) y = self.raw_errors[key][:,dim1,dim2] rbfi = Rbf( x, y, function='linear' ) self.errors[:,ii] += rbfi(self.energy)**2 if not self.__signals_normalized__: self.errors[:,ii] = np.sqrt( self.errors[:,ii] )/self.monitor self.signals[:,ii] /= self.monitor else: self.errors[:,ii] = np.sqrt( self.errors[:,ii] ) elif method == 'pixel2': # assign sign of compensation if self.scan_motor == 'energy': comp_direction = 1.0 elif self.scan_motor == 'anal energy': comp_direction = -1.0 else: print('Unknown energy motor, will break here.') return self.signals = np.zeros(( len(self.energy), len(self.raw_signals) )) self.errors = np.zeros(( len(self.energy), len(self.raw_signals) )) energy = self.energy #* 1e3 # energy in eV ## meanmon = np.mean(self.monitor) for key,ii in zip(sorted(self.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.raw_signals))): S = cenom_dict[key].shape #the_hist = np.histogram(cenom_dict[key][cenom_dict[key]>0.0 ], bins=10) #master_cenom = np.average(the_hist[1][1:], weights= the_hist[0])#cenom_dict[key][int(S[0]/2.),int(S[1]/2.)] master_cenom = np.mean(cenom_dict[key][cenom_dict[key]>0.0 ]) y = self.raw_signals[key] yn = y# (y.T/self.monitor).T * meanmon yc = np.zeros(( len(energy),S[0]*S[1] )) for dim1 in range(S[0]): for dim2 in range(S[1]): sort = np.argsort(energy) if cenom_dict[key][dim1,dim2] > 0.0: yc[:,dim1*dim2] = np.interp( energy[sort] -(comp_direction*(cenom_dict[key][dim1,dim2] - \ master_cenom)), energy[sort], yn[sort,dim1,dim2], \ left=float('nan'), right=float('nan') ) else: yc[:,dim1*dim2] = yn[sort,dim1,dim2] self.signals[:,ii] = xrs_utilities.nonzeroavg(yc) self.errors[:,ii] = np.sqrt(self.signals[:,ii]) # and normalize to I0 if not self.__signals_normalized__: self.signals[:,ii] /= self.monitor self.errors[:,ii] /= self.monitor elif method == 'row': # assign sign of compensation !!! test this !!! if self.scan_motor == 'energy': comp_direction = 1.0 elif self.scan_motor == 'anal energy': comp_direction = -1.0 else: print('Unknown energy motor, will break here.') return self.signals = np.zeros(( len(self.energy), len(self.raw_signals) )) self.errors = np.zeros(( len(self.energy), len(self.raw_signals) )) energy = self.energy * 1e3 # energy in eV ### meanmon = np.mean(self.monitor) for key,ii in zip(sorted(self.raw_signals, key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ), range(len(self.raw_signals))): y = self.raw_signals[key] yn = (y.T/self.monitor).T #### * meanmon CHECK THIS REMOVAL meanii = len(range(yn.shape[1]))//2 yc = np.zeros_like(y) for jj in range(yn.shape[1]): sort = np.argsort(energy) yc[:,jj] = np.interp( energy[sort] + (jj-meanii)*comp_direction*comp_factor*PIXEL_SIZE, energy[sort], yn[sort,jj], left=float('nan'), right=float('nan') ) self.signals[:,ii] = xrs_utilities.nonzeroavg(yc) self.errors[:,ii] = np.sqrt(self.signals[:,ii]) # and normalize to I0 if not self.__signals_normalized__: self.signals[:,ii] /= self.monitor self.errors[:,ii] /= self.monitor else: print( 'Unknown integration method. Use either \'sum\', \'row\', or \'pixel\'.' ) return # set normalization self.__signals_normalized__ = True
[docs] def apply_rois( self, roi_obj, scaling=None ): """ **apply_rois** Sums up intensities in each ROI. Note: Old function, keeping for backward compatibility. Args: roi_obj (instance): Instance of the 'XRStools.xrs_rois.roi_object' class defining the ROIs. scaling (np.array): Array of float-type scaling factors (factor for each ROI). Returns: None if there are not EDF-files to apply the ROIs to. """ # make sure there is data loaded if self.edfmats.size == 0: print( 'Please load some EDF-files first.' ) return # apply ROIs signals = np.zeros((len(self.energy), len(roi_obj.red_rois))) for ii in range(len(self.edfmats)): ind = 0 for key, (pos, M) in roi_obj.red_rois.items(): S = M.shape inset = (slice( pos[0], pos[0]+(S[0]) ), slice( pos[1], pos[1]+(S[1]) )) signals[ii,ind] = np.sum( self.edfmats[ii, inset[0], inset[1]] * (M/M.max())) ind += 1 # assign self.signals = signals self.errors = np.sqrt(signals) # apply scaling (if applicable) if np.any(scaling): # make sure, there is one scaling factor for each ROI assert len(scaling) == signals.shape[1] for ii in range(signals.shape[1]): self.signals[:,ii] *= scaling[ii] self.errors[:,ii] *= scaling[ii]
[docs] def apply_rois_pw( self,roi_obj, scaling=None ): """ **apply_rois_pw** Pixel-wise reading of the ROIs' pixels into a list of arrays. I.e. each n-pixel ROI will have n Spectra, saved in a 2D array. Args: roi_obj (instance): Instance of the 'XRStools.xrs_rois.roi_object' class defining the ROIs. scaling (list) or (np.array): Array or list of float-type scaling factors (one factor for each ROI). """ data = [] # list of 2D arrays (energy vs. intensity for each pixel inside a single ROI) errors = [] for n in range(len(roi_obj.indices)): # each ROI roidata = np.zeros((len(self.edfmats),len(roi_obj.indices[n]))) # 2D np array energy vs pixels in current roi for m in range(len(self.edfmats)): # each energy point along the scan for l in range(len(roi_obj.indices[n])): # each pixel on the detector roidata[m,l] = self.edfmats[m,roi_obj.indices[n][l][0],roi_obj.indices[n][l][1]] data.append(roidata) # list which contains one array (energy point, pixel) per ROI errors.append(np.sqrt(roidata)) self.signals_pw = data self.errors_pw = errors # apply scaling (if applicable) if np.any(scaling): # make sure, there is one scaling factor for each ROI assert len(scaling) == len(roi_obj.indices) for ii in range(len(roi_obj.indices)): self.signals_pw[ii] *= scaling[ii] self.errors_pw[ii] *= scaling[ii]
[docs] def append_scan( self, scan, method='sum', where='right' ): """ **append_scan** Appends scan to the current scan, either at higher energies (where='right') or at lower energies (where='left'). Args: scan (obj): Object of the Scan class. method (str): Keyword specifying the selected choice of data treatment: can be 'sum', 'row', or 'pixel'. Default is 'sum'. where (str): Keyword specifying if the scan should be appended at lower (where='left') or highger energies (where='righ'). Default is 'right'. """ if where == 'right': self.energy = np.append( self.energy , scan.energy ) self.monitor = np.append( self.monitor, scan.monitor ) for key in self.raw_signals: self.raw_signals[key] = np.append( self.raw_signals[key], scan.raw_signals[key], axis=0 ) self.raw_errors[key] = np.append( self.raw_errors[key], scan.raw_errors[key], axis=0 ) if where == 'left': self.energy = np.append( scan.energy, self.energy ) self.monitor = np.append( scan.monitor, self.monitor ) for key in self.raw_signals: self.raw_signals[key] = np.append( scan.raw_signals[key], self.raw_signals[key], axis=0 ) self.raw_errors[key] = np.append( scan.raw_errors[key], self.raw_errors[key], axis=0 )
[docs] def insert_scan( self, scan, method='sum', where=None ): """ **insert_scan** Inserts another scan into the current instance. Args: scan (obj): Object of the Scan class. method (str): Keyword specifying the selected choice of data treatment: can be 'sum', 'row', or 'pixel'. Default is 'sum'. where (list): Optional tuple of energy values (high and low (in keV)) for where to insert the scan. By default (None), the lowest and highest energy values of the given scan will be used. """ # find where given scan should be inserted if not where: low_inds = np.where( self.energy < scan.get_E_start() )[0] high_inds = np.where( self.energy > scan.get_E_end() )[0] else: low_inds = np.where( self.energy < where[0] )[0] high_inds = np.where( self.energy > where[1] )[0] # 'sum' if method == 'sum': self.energy = np.append( self.energy[low_inds], np.append( scan.energy, self.energy[high_inds] ) ) self.monitor = np.append( self.monitor[low_inds], np.append( scan.monitor, self.monitor[high_inds] ) ) for key in self.raw_signals: self.raw_signals[key] = np.append( self.raw_signals[key][low_inds], np.append( scan.raw_signals[key], self.raw_signals[key][high_inds], axis=0 ), axis=0 ) self.raw_errors[key] = np.append( self.raw_errors[key][low_inds], np.append( scan.raw_errors[key], self.raw_errors[key][high_inds], axis=0 ), axis=0 ) # 'pixel' elif method in [ 'pixel' , 'pixel2']: self.energy = np.append( self.energy[low_inds], np.append( scan.energy, self.energy[high_inds] ) ) self.monitor = np.append( self.monitor[low_inds], np.append( scan.monitor, self.monitor[high_inds] ) ) for key in self.raw_signals: self.raw_signals[key] = np.append( self.raw_signals[key][low_inds,:,:], np.append( scan.raw_signals[key], self.raw_signals[key][high_inds,:,:], axis=0 ), axis=0 ) self.raw_errors[key] = np.append( self.raw_errors[key][low_inds,:,:], np.append( scan.raw_errors[key], self.raw_errors[key][high_inds,:,:], axis=0 ), axis=0 ) # 'row' elif method == 'row': self.energy = np.append( self.energy[low_inds], np.append( scan.energy, self.energy[high_inds] ) ) self.monitor = np.append( self.monitor[low_inds], np.append( scan.monitor, self.monitor[high_inds] ) ) for key in self.raw_signals: self.raw_signals[key] = np.append( self.raw_signals[key][low_inds,:], np.append( scan.raw_signals[key], self.raw_signals[high_inds,:], axis=0 ), axis=0 ) self.raw_errors[key] = np.append( self.raw_errors[key][low_inds,:], np.append( scan.raw_errors[key], self.raw_errors[high_inds,:], axis=0 ), axis=0 ) else: print( 'Unknown method. Use either \'sum\', \'row\', or \'pixel\'.' ) return
[docs] def add_scan(self, scan, method='sum', interp=False): """ **add_scan** Adds signals from a different scan. Args: scan (obj): Object of the Scan class. method (str): Keyword specifying the selected choice of data treatment: can be 'sum', 'row', or 'pixel'. Default is 'sum'. interp (boolean): Boolean specifying if norm-conserving linear wavelet interpolation should be used, False by default. """ # @@@@@@@@@@@@@@@@@@ DA VERIFICARE TUTTO L'ACCOUNTING # @@@@@@@@@@@@@@@@@@ changed by christoph: 13/07/2018 # @@@@@@@@@@@@@@@@@@ please double check if not interp: # sum monitors av_monitor = np.sum((self.monitor, scan.monitor), axis=0) for key in self.raw_signals: if method in [ 'sum', 'row', 'column']: # recover raw counts signals1 = self.raw_signals[key]*self.monitor signals2 = scan.raw_signals[key]*scan.monitor # sum signals av_signals = np.sum((signals1, signals2), axis=0) # errors of summed signals av_errors = np.sqrt(av_signals) # assign and renormalize self.raw_signals[key] = av_signals/av_monitor self.raw_errors[key] = av_errors/av_monitor if method in [ 'pixel', 'pixel2']: # recover raw counts signals1 = self.raw_signals[key]*self.monitor[:,None,None] signals2 = scan.raw_signals[key]*scan.monitor[:,None,None] # sum signals av_signals = np.sum((signals1, signals2), axis=0) # errors of summed signals av_errors = np.sqrt(av_signals) # assign and renormalize self.raw_signals[key] = av_signals/av_monitor[:,None,None] self.raw_errors[key] = av_errors/av_monitor[:,None,None] if interp: # find longest scan dimension dim0 = np.amax([len(self.energy), len(scan.energy)]) # sum the monitor signal (always 1D) mm = np.zeros((dim0, 2))*np.nan mm[0:len(self.monitor),0] = self.monitor mm[0:len(scan.monitor),1] = scan.monitor av_monitor = np.nansum( mm , axis=1) # add also unfinished scans for key in self.raw_signals: # construct a matrix for nanmean depending on the shape of the raw_signals if method in ['sum']: # recover raw counts signals1 = self.raw_signals[key]*self.monitor signals2 = scan.raw_signals[key]*scan.monitor # signals yy = np.zeros((dim0, 2))*np.nan yy[0:len(signals1),0] = signals1 yy[0:len(signals2),1] = signals2 av_signals = np.nansum( yy , axis=1) # errors of summed signals av_errors = np.sqrt(av_signals) # self.raw_signals[key] = av_signals/av_monitor self.raw_errors[key] = av_errors/av_monitor if method in ['pixel', 'pixel2']: # recover raw counts signals1 = self.raw_signals[key]*self.monitor[:,None,None] signals2 = scan.raw_signals[key]*scan.monitor[:,None,None] # signals y1 = np.zeros((dim0, signals1.shape[1], signals1.shape[2]))*np.nan y2 = np.zeros((dim0, signals1.shape[1], signals1.shape[2]))*np.nan y1[0:len(signals1),:,:] = signals1 y2[0:len(signals2),:,:] = signals2 av_signals = np.nansum( (y1,y2) , axis=0 ) # errors of summed signals av_errors = np.sqrt( av_signals ) # self.raw_signals[key] = av_signals/av_monitor[:,None,None] self.raw_errors[key] = av_errors/av_monitor[:,None,None] if method in ['row']: # recover raw counts signals1 = self.raw_signals[key]*self.monitor[:,None] signals2 = scan.raw_signals[key]*scan.monitor[:,None] # signals y1 = np.zeros((dim0, signals1.shape[1]))*np.nan y2 = np.zeros((dim0, signals1.shape[1]))*np.nan y1[0:len(signals1),:,:] = signals1 y2[0:len(signals2),:,:] = signals2 av_signals = np.nansum( (y1,y2) , axis=0 ) # errors of summed signals av_errors = np.sqrt( av_signals ) # self.raw_signals[key] = av_signals/av_monitor[:,None] self.raw_errors[key] = av_errors/av_monitor[:,None] if interp=='Rbf': #rbfi = Rbf( scan.energy, scan.monitor, function='linear' ) #self.monitor += rbgi( self.energy ) for key in self.raw_signals: # remove zeros in errors self.raw_errors[key][self.raw_errors[key] == 0 ] = 1.0 scan.raw_errors[key][scan.raw_errors[key] == 0 ] = 1.0 # recover raw counts signals1 = self.raw_signals[key]*self.monitor signals2 = scan.raw_signals[key]*scan.monitor # sum interpolated signals rbfi = Rbf( scan.energy, signals2, function='linear' ) av_signals = signals1 + rbfi( self.energy ) # sum interpolated monitors rbfi = Rbf( scan.energy, scan.monitor, function='linear' ) av_monitor = self.monitor + rbfi( self.energy ) # errors of summed signals av_errors = np.sqrt(av_signals) # assign and renormalize self.raw_signals[key] = av_signals/av_monitor self.raw_errors[key] = av_errors/av_monitor
[docs] def get_type(self): """ **get_type** Returns the type of the scan. """ return self.scan_type
[docs] def get_scan_number(self): """ **get_scan_number** Returns the number of the scan. """ return self.scan_number
[docs] def get_shape(self): """ **get_shape** Returns the shape of the matrix holding the signals. """ if not np.any(self.signals): print( 'please apply the ROIs first.' ) return else: return self.signals.shape
[docs] def get_num_of_rois(self): """ **get_num_of_rois** Returns the number of ROIs applied to the scan. """ if not self.signals.any(): print( 'please apply the ROIs first.' ) return else: return self.signals.shape[1]
[docs] def get_E_start(self): """ **get_E_start** Returs the first energy value. """ return self.energy[0]
[docs] def get_E_end(self): """ **get_E_end** Returs the last energy value. """ return self.energy[-1]
[docs] def get_resolution( self, keV2eV=True ): """ **get_resolution** Returns the ROI-wise resolution based on the xrs_utilities fwhm method. """ resolutions = [] for ii in range(self.signals.shape[1]): x = self.energy y = self.signals[:,ii] if keV2eV: resolutions.append( xrs_utilities.fwhm(x,y)[0]*1.0e3 ) else: resolutions.append( xrs_utilities.fwhm(x,y)[0] ) return np.array(resolutions), np.mean(resolutions), np.std(resolutions)
[docs]class scan: """ Container class, holding information of single scans performed with 2D detectors. """ def __init__(self,edf_arrays,scannumber,energy_scale,monitor_signal,counters,motor_positions,specfile_data,scantype='generic'): # rawdata self.edfmats = np.array(edf_arrays) # numpy array of all 2D images that belong to the scan self.number = scannumber # number under which this scan can be found in the SPEC file self.scantype = scantype # keyword, later used to group scans (add similar scans, etc.) self.energy = np.array(energy_scale) # numpy array of the energy axis self.monitor = np.array(monitor_signal) # numpy array of the monitor signal # some things maybe not imediately necessary self.counters = counters # names of all counters that appear in the SPEC file for this scan (maybe unnecessary) self.motors = motor_positions # all motor positions as found in the SPEC file header for this scan ( " ) self.specdata = np.array(specfile_data) # all data that is also in the SPEC file for this scan # data (to be filled after defining rois) self.eloss = [] # numpy array of the energy loss scale self.signals = [] # numpy array of signals extracted from the ROIs self.errors = [] # numpy array of all Poisson errors self.cenom = [] # list with center of masses (used if scan is an elastic line scan) # pixel-wise things self.signals_pw = [] self.cenom_pw = [] self.signals_pw_interp = []
[docs] def applyrois(self,indices,scaling=None): """ Sums up intensities found in the ROIs of each detector image and stores it into the self.signals attribute. roi_object = instance of the 'rois' class redining the ROIs scaling = numpy array of numerical scaling factors (has to be one for each ROIs) """ data = np.zeros((len(self.edfmats),len(indices))) for n in range(len(indices)): # each roi for m in range(len(self.edfmats)): # each energy point along the scan for l in range(len(indices[n])): # each pixel on the detector data[m,n] += self.edfmats[m,indices[n][l][0],indices[n][l][1]] self.signals = np.array(data) self.errors = np.sqrt(data) if np.any(scaling): assert len(scaling) == len(indices) # make sure, there is one scaling factor for each roi for ii in range(len(indices)): self.signals[:,ii] *= scaling[ii] self.errors[:,ii] *= scaling[ii]
[docs] def applyrois_pw(self,indices,scaling=None): """ Pixel-wise reading of the ROI's pixels into a list of arrays. I.e. each n-pixel ROI will have n Spectra, saved in a 2D array. Parameters ---------- indices : list List of indices (attribute of the xrs_rois class). scaling : list of flaots, optional Python list of scaling factors (one per ROI defined) to be applied to all pixels of that ROI. """ data = [] # list of 2D arrays (energy vs. intensity for each pixel inside a single ROI) for n in range(len(indices)): # each ROI roidata = np.zeros((len(self.edfmats),len(indices[n]))) # 2D np array energy vs pixels in current roi for m in range(len(self.edfmats)): # each energy point along the scan for l in range(len(indices[n])): # each pixel on the detector roidata[m,l] = self.edfmats[m,indices[n][l][0],indices[n][l][1]] data.append(roidata) # list which contains one array (energy point, pixel) per ROI self.signals_pw = data
[docs] def get_eloss_pw(self): """ Finds the center of mass for each pixel in each ROI, sets the energy loss scale in and interpolates the signals to a common energy loss scale. Finds the resolution (FWHM) for each pixel. """ if not self.scantype == 'elastic': # return, if scantype is not elastic print( 'This method is meant for elastic line scans only!' ) return if not self.signals_pw: # return, if there is no data print( 'Please use the applyrois_pw function first!' ) return else: cenom_pw = [] resolution_pw = [] for roiind in range(len(self.signals_pw)): # each ROI oneroi_cenom = [] oneroi_resolution = [] for pixelind in range(len(self.signals_pw[roiind])): # each pixel in the ROI oneroi_cenom.append(xrs_utilities.find_center_of_mass(self.energy,self.signals_pw[roiind][:,pixelind])) try: FWHM,x0 = xrs_utilities.fwhm((self.energy - oneroi_cenom[roiind][pixelind])*1e3,self.signals_pw[roiind][:,pixelind]) oneroi_resolution.append(FWHM) except: oneroi_resolution.append(0.0) cenom_pw.append(oneroi_cenom) resolution_pw.append(oneroi_resolution) # define mean of first ROI as 'master' energy loss scale for all Pixels self.eloss = (self.energy - np.mean(cenom_pw[0]))*1e3 # average energy loss in eV from first ROI
# !!!! mayb it is better to just save the CENOM values and to the interpolation later with the stiched # !!!! whole spectrum... # define eloss-scale for each ROI and interpolate onto the 'master' eloss-scale #for roiind in range(len(self.signals_pw)): # each ROI # oneroi_signals = np.zeros_like(self.signals_pw[roiind]) # for pixelind in range(len(self.signals_pw[roiind])): # x = (self.energy-self.cenom_pw[roiind][pixelind])*1e3 # y = # f = interp1d(x,y, bounds_error=False,fill_value=0.0) # self.signals[:,n] = f(self.eloss) #self.signals_pw_interp = []
[docs] def get_type(self): return self.scantype
[docs] def get_scannumber(self): return self.number
[docs] def get_shape(self): if not np.any(self.signals): print( 'please apply the ROIs first.' ) return else: return np.shape(self.signals)
[docs] def get_numofrois(self): if not self.signals.any(): print( 'please apply the ROIs first.' ) return else: return np.shape(self.signals)[1]
[docs]class scangroup: """ Container class holding information from a group of scans. """ def __init__(self,energy,signals,errors,grouptype='generic'): self.energy = energy # common energy scale self.eloss = [] # common energy loss scale self.signals = signals # summed up signals self.errors = errors # Poisson errors self.grouptype = grouptype # keyword common to all scans self.signals_orig = signals # keep a copy of uninterpolated data
[docs] def get_type(self): return self.grouptype
[docs] def get_meanenergy(self): return np.mean(self.energy)
[docs] def get_estart(self): return self.energy[0]
[docs] def get_eend(self): return self.energy[-1]
[docs] def get_meanegridspacing(self): return np.mean(np.diff(self.energy))
[docs] def get_maxediff(self): return (self.energy[-1]-self.energy[0])
[docs]class Scan_group: """ Container class holding information from a group of scans. """ def __init__(self, energy, signals, errors, group_type='generic'): self.energy = energy # common energy scale self.eloss = [] # common energy loss scale self.signals = signals # summed up signals self.errors = errors # Poisson errors self.grouptype = grouptype # keyword common to all scans self.raw_signals = {} self.raw_errors = {}
[docs] def get_type(self): return self.grouptype
[docs] def get_meanenergy(self): return np.mean(self.energy)
[docs] def get_estart(self): return self.energy[0]
[docs] def get_eend(self): return self.energy[-1]
[docs] def get_meanegridspacing(self): return np.mean(np.diff(self.energy))
[docs] def get_maxediff(self): return (self.energy[-1]-self.energy[0])
[docs]class RC_interp_functor: def __init__(self, RC ) : self.RC = RC def __call__(self, X, shift, factor): return factor* np.interp ( (X-shift)*np.pi/180.0, self.RC[0], self.RC[1])
[docs]class offDiaDataSet: """ **offDiaDataSet** Class to hold information from an off-diagonal dataset. """ def __init__(self): self.RCmonitor = np.array([]) self.signalMatrix = np.array([]) self.errorMatrix = np.array([]) self.motorMatrix = np.array([]) self.I0Matrix = np.array([]) self.energy = np.array([]) self.eloss = np.array([]) self.ROI_number = 0 self.G_vector = np.array([]) self.q0 = np.array([]) self.qh = np.array([]) self.k0 = np.array([]) self.kh = np.array([]) self.kprime = np.array([]) self.alignedSignalMatrix = np.array([]) self.alignedErrorMatrix = np.array([]) self.alignedRCmonitor = np.array([]) self.masterRCmotor= np.array([])
[docs] def save_hdf5( self, fname ): if isinstance(fname, h5py.Group): f=fname else: f = h5py.File(fname, "w") attrs = ["RCmonitor","signalMatrix","errorMatrix","motorMatrix","I0Matrix","energy","eloss", "ROI_number","G_vector","q0","qh","k0","kh","kprime","alignedSignalMatrix","alignedErrorMatrix","alignedRCmonitor","masterRCmotor","RC_fit"] for attr in attrs: if hasattr(self, attr): f[attr] = eval( 'self.' + attr ) if not isinstance(fname, h5py.Group): f.flush() f.close() f=None
[docs] def load_hdf5( self, fname ): doclose = False if isinstance(fname, h5py.Group): f=fname else: doclose = True f = h5py.File(fname, "r") attrs = ["RCmonitor","signalMatrix","errorMatrix","motorMatrix","I0Matrix","energy","eloss", "ROI_number","G_vector","q0","qh","k0","kh","kprime","alignedSignalMatrix","alignedErrorMatrix","alignedRCmonitor","masterRCmotor"] for attr in attrs : setattr( self , attr ,f[attr][()]) if doclose : f.close()
[docs] def filterDetErrors(self,threshold=3000000): print( "threshold ", threshold) inds = np.where(self.signalMatrix >= threshold) print(" ne correggo ", inds) for ii in range(len(inds[0])): self.signalMatrix[inds[0][ii], inds[1][ii]] = 0.0 self.signalMatrix[inds[0][ii], inds[1][ii]] = np.interp(inds[0][ii], [inds[0][ii]-1,inds[0][ii]+1] , [self.signalMatrix[inds[0][ii], inds[1][ii]-1], self.signalMatrix[inds[0][ii], inds[1][ii]+1]] )
[docs] def normalizeSignals(self): raise Exception("Better to call normalizeRC because signalMatrix and errorMatrix come already normalized from the Scan class" )
[docs] def normalizeRC(self): self.signalMatrix /= 1.0 self.errorMatrix /= 1.0 self.RCmonitor /= self.I0Matrix self.signalMatrix *= np.mean(1.0) self.errorMatrix *= np.mean(1.0) self.RCmonitor *= np.mean(self.I0Matrix)
[docs] def alignRCmonitor(self, RCcalc): # check if data exists if not np.any(self.RCmonitor): print('Please load some data first.') return #x = np.arange(len(self.motorMatrix.T[:,0])) #RCmonitor = self.RCmonitor#[:,diagonal_inds[0]:-diagonal_inds[1]] #motorMatrix = self.motorMatrix#[:,diagonal_inds[0]:-diagonal_inds[1]] #signalMatrix= self.signalMatrix#[:,diagonal_inds[0]:-diagonal_inds[1]] RCposition = [] RCmax = [] RC_fit = [] error_tot = 0.0 for ii in range(len(self.RCmonitor)): x = self.motorMatrix[ii,:] y = self.RCmonitor[ii,:] if(len(x)>2) : assert( RCcalc is not None) # guess = [x[np.where(y == np.amax(y))[0]][0], 0.01, 1.0, np.amax(y), 1.] # popt, pcov = optimize.curve_fit(math_functions.pearson7_forcurvefit, x, y,p0=guess) # RCposition.append(popt[0]) # RC_fit.append( math_functions.pearson7_forcurvefit( x, *popt ) ) # RCmax.append(np.amax(y)) # np.savetxt( "RC.txt", np.array([ RCcalc[0]*180.0/np.pi, RCcalc[1] ]).T) # np.savetxt( "xy.txt", np.array([ x, self.RCmonitor[10,:] ]).T) guess = [ x[np.argmax( self.RCmonitor[ii,:])] - RCcalc[0][np.argmax(RCcalc[1])]*180.0/np.pi , np.amax(y)/np.amax( RCcalc[1] ) ] myfunct = RC_interp_functor( RCcalc ) popt, pcov = optimize.curve_fit( myfunct , x, y,p0=guess) RCposition.append(popt[0]) simu = myfunct( x, *popt ) diff = y - simu error_tot += ( diff*diff ) .sum() RC_fit.append( simu ) RCmax.append(np.amax(y)) else: assert( RCcalc is None) RCposition.append(0.0) RCmax.append(y.mean()) RCmax = np.array(RCmax) RCmax /= np.mean(RCmax) # possibly correct for deviations from Bragg's law #RCfit = np.polyval(np.polyfit(self.energy,self.RCposition),self.energy) #for ii in range(len(RCfit)): master_phi = self.motorMatrix[10,:]-RCposition[10] # 10!!!!!! signalMatrix = np.zeros((len(self.energy),len(master_phi))) errorMatrix = np.zeros((len(self.energy),len(master_phi))) RCmonitor = np.zeros((len(self.energy),len(master_phi))) for ii in range(len(self.energy)): signalMatrix[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-RCposition[ii],self.signalMatrix[ii,:])/RCmax[ii] errorMatrix[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-RCposition[ii],self.errorMatrix[ii,:] )/RCmax[ii] RCmonitor[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-RCposition[ii],self.RCmonitor[ii,:] )/RCmax[ii] self.alignedSignalMatrix = signalMatrix self.alignedErrorMatrix = errorMatrix self.masterRCmotor = master_phi self.alignedRCmonitor = RCmonitor self.RC_fit = RC_fit return error_tot
[docs] def alignRCmonitor2(self): # check if data exists if not np.any(self.RCmonitor): print('Please load some data first.') return #x = np.arange(len(self.motorMatrix.T[:,0])) #RCmonitor = self.RCmonitor#[:,diagonal_inds[0]:-diagonal_inds[1]] #motorMatrix = self.motorMatrix#[:,diagonal_inds[0]:-diagonal_inds[1]] #signalMatrix= self.signalMatrix#[:,diagonal_inds[0]:-diagonal_inds[1]] RCposition = [] for ii in range(len(self.RCmonitor)): x = self.motorMatrix[ii,:] y = self.RCmonitor[ii,:] #try: # guess = [x[np.where(y == np.amax(y))[0]][0], 0.01, 1.0, np.amax(y), 1.] # popt, pcov = optimize.curve_fit(math_functions.pearson7_forcurvefit, x, y,p0=guess) # RCposition.append(popt[0]) #except: RCposition.append(xrs_utilities.find_center_of_mass(x,y)) # possibly correct for deviations from Bragg's law #RCfit = np.polyval(np.polyfit(self.energy,self.RCposition),self.energy) #for ii in range(len(RCfit)): master_phi = self.motorMatrix[10,:]-RCposition[10] signalMatrix = np.zeros((len(self.energy),len(master_phi))) errorMatrix = np.zeros((len(self.energy),len(master_phi))) RCmonitor = np.zeros((len(self.energy),len(master_phi))) for ii in range(len(self.energy)): signalMatrix[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-RCposition[ii],self.signalMatrix[ii,:]) errorMatrix[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-RCposition[ii],self.errorMatrix[ii,:]) RCmonitor[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-RCposition[ii],self.RCmonitor[ii,:]) self.alignedSignalMatrix = signalMatrix self.alignedErrorMatrix = errorMatrix self.masterRCmotor = master_phi self.alignedRCmonitor = RCmonitor
[docs] def alignRCmonitorCC(self,repeat=2): """ **alignRCmonitorCC** Use cross-correlation to align data matrix according to the Rockin-Curve monitor. """ # check if data exists if not np.any(self.RCmonitor): print('Please load some data first.') return signalMatrix = np.zeros((len(self.energy),len(self.motorMatrix[0,:]))) errorMatrix = np.zeros((len(self.energy),len(self.motorMatrix[0,:]))) RCmonitor = np.zeros((len(self.energy),len(self.motorMatrix[0,:]))) # first iteration for ii in range(len(self.RCmonitor)): x0 = self.RCmonitor[0,:] x = self.RCmonitor[ii,:] y = np.correlate(x0,x,mode='same') ind = np.where(y == np.amax(y))[0] if ii == 0: master_phi = self.motorMatrix[ii,:] - self.motorMatrix[ii,ind] signalMatrix[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-self.motorMatrix[ii,ind],self.signalMatrix[ii,:]) errorMatrix[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-self.motorMatrix[ii,ind],self.errorMatrix[ii,:]) RCmonitor[ii,:] = np.interp(master_phi,self.motorMatrix[ii,:]-self.motorMatrix[ii,ind],self.RCmonitor[ii,:]) # further iterations if repeat: for jj in range(repeat): for ii in range(len(RCmonitor)): x0 = RCmonitor[0,:] x = RCmonitor[ii,:] y = np.correlate(x0,x,mode='same') ind = np.where(y == np.amax(y))[0] signalMatrix[ii,:] = np.interp(master_phi,master_phi-master_phi[ind],self.signalMatrix[ii,:]) errorMatrix[ii,:] = np.interp(master_phi,master_phi-master_phi[ind],self.errorMatrix[ii,:]) RCmonitor[ii,:] = np.interp(master_phi,master_phi-master_phi[ind],self.RCmonitor[ii,:]) self.alignedSignalMatrix = signalMatrix self.alignedErrorMatrix = errorMatrix self.masterRCmotor = master_phi self.alignedRCmonitor = RCmonitor
[docs] def deglitchSignalMatrix(self,startpoint,stoppoint,threshold): signalMatrix = self.alignedSignalMatrix for ii in range(signalMatrix.shape[1]): ind = np.where(signalMatrix[startpoint:stoppoint,ii] >= threshold)[0] if np.any(ind): signalMatrix[startpoint+ind, ii] = (signalMatrix[startpoint+ind-1,ii] + signalMatrix[startpoint+ind+1,ii] )/2.0 self.alignedSignalMatrix = signalMatrix
[docs] def interpolateMatrix(self,master_matrix,master_energy,master_RCmotor): from scipy import interpolate signalMatrix = self.alignedSignalMatrix y=self.energy x=self.masterRCmotor xx, yy = np.meshgrid(x, y) z = signalMatrix f = interpolate.interp2d(x, y, z, kind='cubic') ynew = master_energy xnew = master_RCmotor znew = f(xnew, ynew) self.interpSignalMatrix = znew self.interpEnergy = ynew self.interpRCmotor = xnew
[docs] def removeElastic(self,fitrange=[-6.0,2.0]): self.energy = np.array(self.energy) cenom = [] for ii in range(self.signalMatrix.shape[1]): inds = np.where(np.logical_and(np.array(self.energy) >= fitrange[0], np.array(self.energy)<= fitrange[1]))[0] x = self.energy[inds] y = self.alignedSignalMatrix[inds,ii] guess = [x[np.where(y==np.amax(y))[0]], 0.002, 100.0, np.amax(y) ,0.0] fitfct = lambda a: np.sum( (y - math_functions.pearson7(x,a) )**2.0 ) params = optimize.minimize(fitfct, guess, method='SLSQP').x cenom.append(params[0]) back = math_functions.pearson7(self.energy,params ) plt.ion() plt.cla() plt.plot(self.energy,self.alignedSignalMatrix[:,ii],self.energy,back,self.energy,self.alignedSignalMatrix[:,ii]-back) plt.waitforbuttonpress() self.alignedSignalMatrix[:,ii] = self.alignedSignalMatrix[:,ii]-back self.eloss = (self.energy - np.mean(cenom))*1.0e3
[docs] def removeElastic2(self, fitrange1, fitrange2, guess=None): """ **removeElastic2** Subtract Pearson7 plus linear. """ self.alignedSignalMatrixB = np.zeros_like(self.alignedSignalMatrix) self.energy = np.array(self.energy) cenom = [] for ii in range(self.signalMatrix.shape[1]): region1 = np.where(np.logical_and(np.array(self.energy) >= fitrange1[0], np.array(self.energy) <= fitrange1[1]))[0] region2 = np.where(np.logical_and(np.array(self.energy) >= fitrange2[0], np.array(self.energy) <= fitrange2[1]))[0] inds = np.append(region1,region2) x = self.energy[inds] y = self.alignedSignalMatrix[inds,ii] if not guess: guess = [x[np.where(y==np.amax(y))[0]], 0.001, 1.0, np.amax(y) ,0.0, -0.1, 0.01] fitfct = lambda a: np.sum( (y - math_functions.pearson7_linear(x,a) )**2.0 ) params = optimize.minimize(fitfct, guess, method='COBYLA',tol=1e-20).x cenom.append(params[0]) back = math_functions.pearson7_linear(self.energy,params ) plt.ion() plt.cla() plt.plot(self.energy,self.alignedSignalMatrix[:,ii],self.energy,back,self.energy,self.alignedSignalMatrix[:,ii]-back) plt.waitforbuttonpress() self.alignedSignalMatrixB[:,ii] = self.alignedSignalMatrix[:,ii]-back self.eloss = (self.energy - np.mean(cenom))*1.0e3
[docs] def windowSignalMatrix(self,estart,estop): inds = np.where(np.logical_and(self.eloss>=estart, self.eloss<= estop))[0] self.alignedSignalMatrix = self.alignedSignalMatrix[inds[0]:inds[-1],:]
[docs] def removeLinearBack(self,fitrange1,fitrange2): self.energy = np.array(self.energy) for ii in range(self.signalMatrix.shape[1]): region1 = np.where(np.logical_and(np.array(self.energy) >= fitrange1[0], np.array(self.energy) <= fitrange1[1])) region2 = np.where(np.logical_and(np.array(self.energy) >= fitrange2[0], np.array(self.energy) <= fitrange2[1])) region = np.append(region1,region2) x = np.array(self.energy[region]) y = self.alignedSignalMatrix[region,ii] back = np.polyval(np.polyfit(x,y,1),self.energy) self.alignedSignalMatrix[:,ii] -= back
[docs] def removeConstBack(self,fitrange1,fitrange2): self.energy = np.array(self.energy) for ii in range(self.signalMatrix.shape[1]): region1 = np.where(np.logical_and(np.array(self.energy) >= fitrange1[0], np.array(self.energy) <= fitrange1[1])) region2 = np.where(np.logical_and(np.array(self.energy) >= fitrange2[0], np.array(self.energy) <= fitrange2[1])) region = np.append(region1,region2) x = np.array(self.energy[region]) y = self.alignedSignalMatrix[region,ii] back = np.polyval(np.polyfit(x,y,0),self.energy) self.alignedSignalMatrix[:,ii] -= back
[docs] def removePearsonBack(self,fitrange1,fitrange2): self.energy = np.array(self.energy) for ii in range(self.signalMatrix.shape[1]): region1 = np.where(np.logical_and(np.array(self.energy) >= fitrange1[0], np.array(self.energy) <= fitrange1[1])) region2 = np.where(np.logical_and(np.array(self.energy) >= fitrange2[0], np.array(self.energy) <= fitrange2[1])) region = np.append(region1,region2) x = np.array(self.energy[region]) y = self.alignedSignalMatrix[region,ii] guess = [x[np.where(y==np.amax(y))[0]], 0.002, 100.0, np.amax(y) ,0.0] fitfct = lambda a: np.sum( (y - math_functions.pearson7(x,a) )**2.0 ) params = optimize.minimize(fitfct, guess, method='SLSQP').x back = math_functions.pearson7(self.energy,params) self.alignedSignalMatrix[:,ii] -= back
[docs] def replaceSignalByConstant(self,fitrange): self.energy = np.array(self.energy) for ii in range(self.signalMatrix.shape[1]): inds = np.where(np.logical_and(np.array(self.energy) >= fitrange[0], np.array(self.energy) <= fitrange[1]))[0] x = np.array(self.energy[inds]) y = self.alignedSignalMatrix[inds,ii] back = np.polyval(np.polyfit(x,y,0),self.energy) self.alignedSignalMatrix[:,ii] = back
[docs]def findgroups(scans): """ this groups together instances of the scan class based on their "scantype" attribute and returns ordered scans """ allscannames = [] for scan in scans: print( scan ) allscannames.append(scan) allscannames.sort( key = lambda x: int(''.join(filter(str.isdigit, str(x) ))) ) # allscans = [] for scan in allscannames: allscans.append(scans[scan]) allscans = sorted(allscans,key=lambda x:x.get_type()) rawgroups = [] results = groupby(allscans,lambda x:x.get_type()) print( 'The following scangroups were found:' ) for key,thegroup in results: print( key ) ls = -1 thegroup = list(thegroup) for t in thegroup: if ls!=-1 and len(t.monitor)!=ls: print( " Scan Number :" +str( t.scan_number )+" is added to group of key " +key + " but has lenght "+ str(len(t.monitor)) + " versus " + str( ls) ) rawgroups.append(list(thegroup)) return rawgroups
[docs]def makegroup(groupofscans,grouptype=None): """ takes a group of scans, sums up the signals and monitors, estimates poisson errors, and returns an instance of the scangroup class (turns several instances of the "scan" class into an instance of the "scangroup" class) """ if not grouptype: grouptype = groupofscans[0].get_type() # the type of the sum of scans will be the same as the first from the list theenergy = groupofscans[0].energy # all scans are splined onto energy grid of the first scan thesignals = np.zeros(groupofscans[0].get_shape()) theerrors = np.zeros(groupofscans[0].get_shape()) themonitors = np.zeros(np.shape(groupofscans[0].monitor)) for scan in groupofscans: f = interpolate.interp1d(scan.energy,scan.monitor, bounds_error=False, fill_value=0.0) moni = f(theenergy) themonitors += moni for n in range(thesignals.shape[-1]): f = interpolate.interp1d(scan.energy,scan.signals[:,n], bounds_error=False, fill_value=0.0) signal = f(theenergy) thesignals[:,n] += signal*moni for n in range(thesignals.shape[-1]): theerrors[:,n] = np.sqrt(thesignals[:,n]) # and normalize for n in range(thesignals.shape[-1]): thesignals[:,n] = thesignals[:,n]/themonitors theerrors[:,n] = theerrors[:,n]/themonitors group = scangroup(theenergy,thesignals,theerrors,grouptype) return group
[docs]def makegroup_nointerp(groupofscans,grouptype=None,absCounts=False): """ takes a group of scans, sums up the signals and monitors, estimates poisson errors, and returns an instance of the scangroup class (turns several instances of the "scan" class into an instance of the "scangroup" class), same as makegroup but withouth interpolation to account for encoder differences... may need to add some "linspace" function in case the energy scale is not monotoneous... """ if not grouptype: grouptype = groupofscans[0].get_type() # the type of the sum of scans will be the same as the first from the list theenergy = groupofscans[0].energy thesignals = np.zeros(groupofscans[0].get_shape()) theerrors = np.zeros(groupofscans[0].get_shape()) themonitors = np.zeros(np.shape(groupofscans[0].monitor)) for scan in groupofscans: themonitors += scan.monitor for n in range(thesignals.shape[-1]): thesignals[:,n] += scan.signals[:,n]* scan.monitor for n in range(thesignals.shape[-1]): theerrors[:,n] = np.sqrt(thesignals[:,n]) # and normalize for n in range(thesignals.shape[-1]): thesignals[:,n] = thesignals[:,n]/themonitors theerrors[:,n] = theerrors[:,n]/themonitors group = scangroup(theenergy,thesignals,theerrors,grouptype) return group
[docs]def make_scan_group_sum( group_of_scans, group_type=None, abs_counts=False): """ **make_scan_group_sum** Sums together a list of scans with equal scan_type and returns an instance of the Scan_group class. Args: group_of_scans (list): List containing instances of the Scan class to be summed up. group_type (str): Keyword defining the type of scans, if None (default) the group_type will be the type of the first scan in the group_of_scans list. abs_counts (boolean): Boolean defining if results should be returned in absolute count units (ct/s). time_counter (str): Counter name for the SPEC counting time mnemonic. Returns: group (obj): Instance of the Scan_group container class. """ # if not provided, type of returned group will be same as first in group_of_scans if not group_type: group_type = group_of_scans[0].get_type() # create the arrays for energy, signals, and errors theenergy = group_of_scans[0].energy thesignals = np.zeros( group_of_scans[0].get_shape() ) theerrors = np.zeros( group_of_scans[0].get_shape() ) themonitors = np.zeros( np.shape(group_of_scans[0].monitor) ) # sum up the different scans in the list for scan in group_of_scans: themonitors += scan.monitor for n in range(thesignals.shape[-1]): thesignals[:,n] += scan.signals[:,n]*scan.monitor for n in range(thesignals.shape[-1]): theerrors[:,n] = np.sqrt(thesignals[:,n]) # and normalize by the sum of the monitor signal for n in range(thesignals.shape[-1]): thesignals[:,n] = thesignals[:,n]/themonitors theerrors[:,n] = theerrors[:,n]/themonitors group = Scan_group( theenergy, thesignals, theerrors, group_type ) return group
[docs]def make_scan_group_pixel( group_of_scans, group_type=None, abs_counts=False ): """ **make_scan_group_pixel** Pixel-by-pixel summation of a list of scans with equal scan_type and returns an instance of the Scan_group class. Args: group_of_scans (list): List containing instances of the Scan class to be summed up. group_type (str): Keyword defining the type of scans, if None (default) the group_type will be the type of the first scan in the group_of_scans list. abs_Counts (boolean): Boolean defining if results should be returned in absolute count units (ct/s). time_counter (str): Counter name for the SPEC counting time mnemonic. Returns: group (obj): Instance of the Scan_group container class. """ # if not provided, type of returned group will be same as first in group_of_scans if not group_type: group_type = group_of_scans[0].get_type() # create the arrays/dicts for energy, signals, and errors energy = group_of_scans[0].energy monitors = np.zeros(np.shape(groupofscans[0].monitor)) raw_signals = {} raw_errors = {} for key in group_of_scans[0].raw_signals: raw_signals[key] = np.zeros_like(group_of_scans[0].raw_signals[key]) raw_errors[key] = np.zeros_like(group_of_scans[0].raw_errors[key]) # sum up the different scans in the list (pixel-by-pixel) for scan in group_of_scans: monitors += scan.monitor for key in scan.raw_signals: print(scan.raw_signals[key].shape, scan.monitor.shape) raw_signals[key] += scan.raw_signals[key]*scan.monitor for key in raw_signals: raw_errors[key] = np.sqrt(raw_signals[key]) # and normalize by the sum of the monitor signal for key in raw_signals: raw_signals[key] /= monitors raw_errors[key] /= monitors # create the group group = Scan_group( energy, np.array([]), np.array([]), group_type ) group.raw_signals = raw_signals group.raw_errors = raw_errors return group
[docs]def append2Scan_right(group1,group2,inds=None,grouptype='spectrum'): """ append two instancees of the scangroup class, return instance of scangroup append group2[inds] to the right (higher energies) of group1 if inds is not None, only append rows indicated by inds to the first group """ # assert isinstance(group1,scangroup) and isinstance(group2,scangroup) energy = group1.energy signals = group1.signals errors = group1.errors gtype = group1.grouptype if not inds: energy = np.append(energy,group2.energy) signals = np.append(signals,np.squeeze(group2.signals),0) errors = np.append(errors,np.squeeze(group2.errors),0) return scangroup(energy,signals,errors,grouptype=gtype) else: energy = np.append(energy,group2.energy[inds]) signals = np.append(signals,np.squeeze(group2.signals[inds,:]),0) errors = np.append(errors,np.squeeze(group2.errors[inds,:]),0) return scangroup(energy,signals,errors,grouptype=gtype)
[docs]def append2Scan_left(group1,group2,inds=None,grouptype='spectrum'): """ append two instancees of the scangroup class, return instance of scangroup append group1[inds] to the left (lower energies) of group2 if inds is not None, only append rows indicated by inds to the first group """ assert isinstance(group1,scangroup) and isinstance(group2,scangroup) if not inds: energy = group1.energy signals = group1.signals errors = group1.errors gtype = group1.grouptype energy = np.append(energy,group2.energy) signals = np.append(signals,np.squeeze(group2.signals),0) errors = np.append(errors,np.squeeze(group2.errors),0) return scangroup(energy,signals,errors,grouptype=gtype) else: energy = group1.energy[inds] signals = group1.signals[inds,:] errors = group1.errors[inds,:] gtype = group1.grouptype energy = np.append(energy,group2.energy) signals = np.append(signals,np.squeeze(group2.signals),0) errors = np.append(errors,np.squeeze(group2.errors),0) return scangroup(energy,signals,errors,grouptype=gtype)
[docs]def insertScan(group1,group2,grouptype='spectrum'): """ inserts group2 into group1 NOTE! there is a numpy insert function, maybe it would be better to use that one! """ # find indices for below and above group2 lowinds = np.where(group1.energy<group2.get_estart()) highinds = np.where(group1.energy>group2.get_eend()) energy = np.append(group1.energy[lowinds],np.append(group2.energy,group1.energy[highinds])) signals = np.append(np.squeeze(group1.signals[lowinds,:]),np.append(group2.signals,np.squeeze(group1.signals[highinds,:]),0),0) errors = np.append(np.squeeze(group1.errors[lowinds,:]),np.append(group2.errors,np.squeeze(group1.errors[highinds,:]),0),0) return scangroup(energy,signals,errors,grouptype)
[docs]def catScansLong(groups,include_elastic): """ takes a longscan and inserts other backgroundscans (scans that have 'long' in their name) and other scans and inserts them into the long scan. """ # the long scan spectrum = groups['long'] # groups that don't have 'long' in the grouptype allgroups = [] for group in groups: if not 'long' in group: allgroups.append(groups[group]) allgroups.sort(key = lambda x:x.get_estart()) # groups that have 'long' in the grouptype longgroups = [] for group in groups: if 'long' in group and group != 'long': longgroups.append(groups[group]) longgroups.sort(key = lambda x:x.get_estart()) # if there are other longscans: insert those first into the long scan for group in longgroups: spectrum = insertScan(spectrum,group) # insert other scans into the long scan for group in allgroups: spectrum = insertScan(spectrum,group) # cut off the elastic line if present in the groups if 'elastic' in groups and not include_elastic: inds = np.where(spectrum.energy > groups['elastic'].get_eend())[0] return spectrum.energy[inds], spectrum.signals[inds,:], spectrum.errors[inds,:] else: return spectrum.energy, spectrum.signals, spectrum.errors
[docs]def catScans(groups,include_elastic): """ concatenate all scans in groups, return the appended energy, signals, and errors """ # sort the groups by their start-energy allgroups = [] for group in groups: allgroups.append(groups[group]) allgroups.sort(key = lambda x:x.get_estart()) # assign first group to the spectrum (which is an instance of the scangroup class as well) spectrum = scangroup(allgroups[0].energy,allgroups[0].signals,allgroups[0].errors,grouptype='spectrum') # go through all other groups and append them to the right of the spectrum if len(allgroups)>1: # check if there are groups to append for group in allgroups[1:]: spectrum = append2Scan_right(spectrum,group) # cut off the elastic line if present in the groups if 'elastic' in groups and not include_elastic: inds = np.where(spectrum.energy > groups['elastic'].get_eend())[0] return spectrum.energy[inds], spectrum.signals[inds,:], spectrum.errors[inds,:] else: return spectrum.energy, spectrum.signals, spectrum.errors
[docs]def catScans_pixel( groups, include_elastic ): """ **catScans_pixel** Stitch together all scans in groups in a pixel-by-pixel fashion for the case of no available long (overview) scan. Args: groups (list): List of scan-groups to be stitched together. include_elastic (boolean): Boolean switch if the elastic line should be included in the final spectrum or not. Returns: energy (np.array): Array of energy loss values. raw_signals (dict): Dictionary (one entry per ROI) of pixel-by-pixel intensities. raw_errors (dict): Dictionary (one entry per ROI) of pixel-by-pixel Poisson errors. """ # sort the groups by their start-energy all_groups = [] for group in groups: all_groups.append(groups[group]) all_groups.sort(key = lambda x:x.get_estart()) # create a scan-group for the stitched spectrum spectrum = Scan_group( all_groups[0].energy, np.array([]), np.array([]), group_type='spectrum' ) spectrum.raw_signals = all_groups[0].raw_signals spectrum.raw_errors = all_groups[0].raw_errors # go through all other groups and append them to the right of the spectrum if len(all_groups)>1: # check if there are groups to append for group in all_groups[1:]: spectrum = append2Scan_right_pixel( spectrum, group ) # cut off the elastic line if present in the groups if 'elastic' in groups and not include_elastic: inds = np.where(spectrum.energy > groups['elastic'].get_eend())[0] return spectrum.energy[inds], spectrum.signals[inds,:], spectrum.errors[inds,:] else: return spectrum.energy, spectrum.signals, spectrum.errors
[docs]def appendScans(groups,include_elastic): """ try including different background scans... append groups of scans ordered by their first energy value. long scans are inserted into gaps that at greater than two times the grid of the finer scans """ # find all types of groups grouptypes = [key for key in groups.keys()] if 'long' in grouptypes: print( " going to refine " ) return catScansLong(groups,include_elastic) else: return catScans(groups,include_elastic)
[docs]def appendScans_pixel( groups, include_elastic ): """ **appendScans_pixel** Decides if there is a long scan available and selects accordingly which stitching method to use. Args: groups (list): List of scan-groups to be stitched together. include_elastic (boolean): Boolean switch if the elastic line should be included in the final spectrum or not. Returns: energy (np.array): Array of energy loss values. raw_signals (dict): Dictionary (one entry per ROI) of pixel-by-pixel intensities. raw_errors (dict): Dictionary (one entry per ROI) of pixel-by-pixel Poisson errors. """ # find all types of groups grouptypes = [key for key in groups.keys()] if 'long' in grouptypes: print( " going to refine " ) return catScansLong_pixel( groups, include_elastic ) else: return catScans_pixel( groups, include_elastic )
[docs]def stitch_groups_to_spectrum(groups, method='sum', include_elastic=False ): """ **stitch_groups_to_spectrum** Takes a dictionary of instances of the Scan class and stitches them together to produce a spectrum. Long scans and scans that have 'long' in ther scan_type attribute are treated specially. Args: groups (list): List of instances of the Scan class. method (str): Keyword describing the kind of integration scheme to be used (possible values are 'sum', 'pixel', or 'row'), default is 'sum'. include_elastic (boolean): Boolean flag deciding if the elastic should be included in the final spectrum. Returns: Scan class instance with the stitched spectrum. """ # create the final spectrum spectrum = Scan() # find all groups that are not long scans, sort by acending energy all_groups = [] for group in groups: if not 'long' in group: all_groups.append(groups[group]) all_groups.sort( key = lambda x:x.get_E_start() ) # groups that have 'long' in the grouptype long_groups = [] for group in groups: # if there is a long scan if group == 'long': spectrum.energy = groups[group].energy spectrum.monitor = groups[group].monitor #if method is 'sum': # spectrum.signals = groups[group].signals # spectrum.errors = groups[group].errors #elif method is 'pixel' or 'row': spectrum.raw_signals = groups[group].raw_signals spectrum.raw_errors = groups[group].raw_errors #else: # print('Method \''+method+'\' not supported, use either \'pixel\', \'column\', or \'row\'.') # return # if there is a scan that has 'long' in its name if 'long' in group and group != 'long': long_groups.append(groups[group]) long_groups.sort( key = lambda x:x.get_E_start() ) # if long exists, insert backround groups, then other scans if np.any(spectrum.energy): for group in long_groups: spectrum.insert_scan( group, method=method ) for group in all_groups: spectrum.insert_scan( group, method=method ) # if no long scan exists, just append all others else: spectrum.energy = all_groups[0].energy spectrum.monitor = all_groups[0].monitor #if method is 'sum': spectrum.raw_signals = all_groups[0].raw_signals spectrum.raw_errors = all_groups[0].raw_errors #elif method is 'pixel' or 'row': # spectrum.raw_signals = all_groups[0].raw_signals # spectrum.raw_errors = all_groups[0].raw_errors #else: # print('Method \''+method+'\' not supported, use either \'pixel\', \'column\', or \'row\'.') # return for group in all_groups[1::]: spectrum.append_scan(group, method=method) # cut elastic line if applicable if 'elastic' in groups and not include_elastic: inds = np.where(spectrum.energy > groups['elastic'].get_E_end())[0] spectrum.energy = spectrum.energy[inds] spectrum.monitor = spectrum.monitor[inds] for key in spectrum.raw_signals: if method == 'sum': spectrum.raw_signals[key] = spectrum.raw_signals[key][inds] spectrum.raw_errors[key] = spectrum.raw_errors[key][inds] if method in [ 'pixel' , 'pixel2']: spectrum.raw_signals[key] = spectrum.raw_signals[key][inds,:,:] spectrum.raw_errors[key] = spectrum.raw_errors[key][inds,:,:] if method == 'row': spectrum.raw_signals[key] = spectrum.raw_signals[key][inds,:] spectrum.raw_errors[key] = spectrum.raw_errors[key][inds,:] return spectrum
[docs]def get_XES_spectrum( groups ): """ **get_XES_spectrum** Constructs a XES spectrum from the given scan groups (sums of separate emission scans). Args: groups (dict): Dictionary of groups with partial XES scans. """ # sort the groups by their end-energy (is the smallest in XES/energy2-scans) allgroups = [] for group in groups: if groups[group].energy[0] > groups[group].energy[-1]: groups[group].energy = np.flipud( groups[group].energy ) for ii in range(groups[group].signals.shape[1]): groups[group].signals[:,ii] = np.flipud( groups[group].signals[:,ii] ) allgroups.append(groups[group]) allgroups.sort(key = lambda x:x.get_E_end()) # find lowest energy, highest energy, smallest increment, define grid eStart = np.amin([ np.amin(allgroups[ii].energy) for ii in range(len(allgroups)) ]) eStop = np.amax([ np.amax(allgroups[ii].energy) for ii in range(len(allgroups)) ]) eStep = np.amin(np.abs( [np.diff(group.energy)[0] for group in allgroups] )) energy = np.arange(eStart,eStop,eStep) signals = np.zeros((len(energy),group.signals.shape[1],len(allgroups))) errors = np.zeros((len(energy),group.signals.shape[1],len(allgroups))) # interpolate all groups onto grid, put into a matrix for group,ii in zip(allgroups,range(len(allgroups))): for jj in range(group.signals.shape[1]): interp_signals = np.interp(energy, group.energy, group.signals[:,jj], left=0.0, right=0.0) signals[:,jj,ii] = interp_signals interp_errors = np.interp(energy, group.energy, group.errors[:,jj], left=0.0, right=0.0) errors[:,jj,ii] = interp_errors # sum up and weigh by number of available non-zero points sum_signals = np.zeros( (len(energy), group.signals.shape[1])) sum_errors = np.zeros( (len(energy), group.signals.shape[1])) for jj in range(group.signals.shape[1]): for ii in range(len(energy)): nParts = len(np.where(signals[ii,jj]>0.0)[0]) sum_signals[ii,jj] = np.sum(signals[ii,jj])/nParts sum_errors[ii,jj] = np.sqrt(np.sum(errors[ii,jj]**2.0))/nParts spectrum = scangroup(energy, sum_signals, sum_errors, grouptype='spectrum') return spectrum.energy, spectrum.signals, spectrum.errors
[docs]def catXESScans(groups): """ Concatenate all scans in groups, return the appended energy, signals, and errors. This needs to be a bit smarter to also work for scans that are scanned from small to large energy... """ # sort the groups by their end-energy (is the smallest in XES/energy2-scans) allgroups = [] for group in groups: if groups[group].energy[0] > groups[group].energy[-1]: groups[group].energy = np.flipud( groups[group].energy ) for ii in range(groups[group].signals.shape[1]): groups[group].signals[:,ii] = np.flipud( groups[group].signals[:,ii] ) allgroups.append(groups[group]) allgroups.sort(key = lambda x:x.get_eend()) # find lowest energy, highest energy, smallest increment, define grid eStart = np.amin([ np.amin(allgroups[ii].energy) for ii in range(len(allgroups)) ]) eStop = np.amax([ np.amax(allgroups[ii].energy) for ii in range(len(allgroups)) ]) eStep = np.amin(np.abs( [np.diff(group.energy)[0] for group in allgroups] )) energy = np.arange(eStart,eStop,eStep) signals = np.zeros((len(energy),group.signals.shape[1],len(allgroups))) errors = np.zeros((len(energy),group.signals.shape[1],len(allgroups))) # interpolate all groups onto grid, put into a matrix for group,ii in zip(allgroups,range(len(allgroups))): for jj in range(group.signals.shape[1]): interp_signals = np.interp(energy, group.energy, group.signals[:,jj], left=0.0, right=0.0) signals[:,jj,ii] = interp_signals interp_errors = np.interp(energy, group.energy, group.errors[:,jj], left=0.0, right=0.0) errors[:,jj,ii] = interp_errors # sum up and weigh by number of available non-zero points sum_signals = np.zeros( (len(energy), group.signals.shape[1])) sum_errors = np.zeros( (len(energy), group.signals.shape[1])) for jj in range(group.signals.shape[1]): for ii in range(len(energy)): nParts = len(np.where(signals[ii,jj]>0.0)[0]) sum_signals[ii,jj] = np.sum(signals[ii,jj])/nParts sum_errors[ii,jj] = np.sqrt(np.sum(errors[ii,jj]**2.0))/nParts spectrum = scangroup(energy,sum_signals,sum_errors,grouptype='spectrum') return spectrum.energy, spectrum.signals, spectrum.errors
[docs]def appendXESScans(groups): """ try including different background scans... append groups of scans ordered by their first energy value. long scans are inserted into gaps that at greater than two times the grid of the finer scans """ # find all types of groups grouptypes = [key for key in groups.keys()] return catXESScans(groups)
[docs]def create_sum_image(scans,scannumbers): """ Returns a summed image from all scans with numbers 'scannumbers'. scans = dictionary of objects from the scan-class scannumbers = single scannumber, or list of scannumbers from which an image should be constructed """ # make 'scannumbers' iterable (even if it is just an integer) numbers = [] if not isinstance(scannumbers,list): numbers.append(scannumbers) else: numbers = scannumbers key = 'Scan%03d' % numbers[0] image = np.zeros_like(scans[key].edfmats[0,:,:]) for number in numbers: key = 'Scan%03d' % number for ii in range(scans[key].edfmats.shape[0]): image += scans[key].edfmats[ii,:,:] return image
[docs]def create_diff_image(scans,scannumbers,energy_keV): """ Returns a summed image from all scans with numbers 'scannumbers'. scans = dictionary of objects from the scan-class scannumbers = single scannumber, or list of scannumbers from which an image should be constructed """ # make 'scannumbers' iterable (even if it is just an integer) numbers = [] if not isinstance(scannumbers,list): numbers.append(scannumbers) else: numbers = scannumbers key = 'Scan%03d' % numbers[0] below_image = np.zeros_like(scans[key].edfmats[0,:,:]) above_image = np.zeros_like(scans[key].edfmats[0,:,:]) # find indices below and above 'energy' below_inds = scans[key].energy < energy_keV above_inds = scans[key].energy > energy_keV for number in numbers: key = 'Scan%03d' % number for ii in below_inds: below_image += scans[key].edfmats[ii,:,:] for ii in above_inds: above_image += scans[key].edfmats[ii,:,:] return (above_image - below_image)
[docs]def findScans_bytype(scans, tipo): """ **findRCscans** Returns a list of scans with name tipo. """ RCscans = [] for key in scans: if scans[key].get_type() == tipo: RCscans.append(scans[key]) return RCscans
[docs]def sum_scans_to_group( group, method='sum', interp=False ): """ **sum_scans_to_group** Sums up all scans in the list group to form a scan-group. Args: group (list): List of scans to be added up. method (str): Keyword describing which data analysis method should be used. Possible values are 'sum', 'pixel', 'row'. Default is 'sum'. interp (boolean): Flag if interpolation onto the energy grid of the first scan should be applied (default is False). Returns: An instance of the Scan class containing the summed up results. """ # initialize result summed_group = Scan() summed_group.energy = group[0].energy summed_group.monitor = group[0].monitor # sum up summed_group.raw_signals = group[0].raw_signals summed_group.raw_errors = group[0].raw_errors for scan in group[1::]: summed_group.add_scan( scan, method=method, interp=interp ) return summed_group
[docs]def edf_cleaner(edfmats, threshold, dim1_range=[60,190], dim2_range=[10,1286] ): """ **clean_edf_stack** Removes totally saturated detector images from the EDF-files. Args: edfmats (np.array): Three-dimensional array of EDF-matrices. Returns: edfmats (np.array): Cleaned stack of EDF-files. """ num_to_replace = [] for ii in range(edfmats.shape[0]): if np.sum(edfmats[ii,dim1_range[0]:dim1_range[1],dim2_range[0]:dim2_range[1]]) >= threshold: num_to_replace.append(ii) print('found corrupted images at: ', num_to_replace) # if two neighboring images are corrupted num_to_replace = np.array(num_to_replace) if np.where(np.diff(num_to_replace)==1)>0: double_bad_images = np.where(np.diff(num_to_replace)==1) print('found neighboring corrupted images!' , num_to_replace[double_bad_images]) for number in double_bad_images: if number > 0: # replace the first image by the previous one edfmats[num_to_replace[number],:,:] = edfmats[num_to_replace[number]-1,:,:] elif number == 0: # replace the second image by the next one edfmats[num_to_replace[number]+1,:,:] = edfmats[num_to_replace[number]+2,:,:] # search again for remaining single bad images num_to_replace = [] for ii in range(edfmats.shape[0]): if np.sum(edfmats[ii,dim1_range[0]:dim1_range[1],dim2_range[0]:dim2_range[1]]) >= threshold: num_to_replace.append(ii) print('found corrupted images at: ', num_to_replace) for number in num_to_replace: try: if number > 0 and number < (edfmats.shape[0]-1): # interpolate if image to replace is in the center of the scan edfmats[number,:,:] = (edfmats[number-1,:,:] + edfmats[number+1,:,:])/2.0 elif number == 0: # replace first image by second edfmats[number,:,:] = edfmats[number+1,:,:] elif number == edfmats.shape[0]: # replace last image by second to last edfmats[number,:,:] = edfmats[number-1,:,:] else: pass except: # if all fails print('WARNING: could not replace image number %d'%number) pass return edfmats