#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
A library to get index of refraction (delta) or attenuation length.
Author: Maksim Rakitin (BNL)
2016
"""
import json
import math
import os
from bnlcrl import visualize as vis
from bnlcrl.utils import convert_types, defaults_file, read_json
parms = defaults_file(suffix='delta')
DAT_DIR = parms['dat_dir']
CONFIG_DIR = parms['config_dir']
DEFAULTS_FILE = parms['defaults_file']
[docs]class DeltaFinder:
def __init__(self, **kwargs):
# Check importable libs:
self._check_imports()
# Get input variables:
d = read_json(DEFAULTS_FILE)
self.server_info = d['server_info']
self.parameters = convert_types(d['parameters'])
self.default_e_min = self.parameters['e_min']['type'](self.parameters['e_min']['default'])
self.default_e_max = self.parameters['e_max']['type'](self.parameters['e_max']['default'])
for key, default_val in self.parameters.items():
if key in kwargs.keys():
setattr(self, key, self.parameters[key]['type'](kwargs[key]))
elif not hasattr(self, key) or getattr(self, key) is None:
setattr(self, key, default_val['default'])
self.characteristic_value = None
self.analytical_delta = None
self.closest_energy = None
self.content = None
self.raw_content = None
self.method = None # can be 'file', 'server', 'calculation'
self.output = None
self.elements = self.formula.split(',')
self.element = self.elements[-1]
if self.outfile:
self.save_to_file()
return
if not self.data_file:
self.method = 'server'
self._request_from_server()
else:
self.method = 'file'
self.data_file = os.path.join(DAT_DIR, self.data_file)
if self.calc_delta:
if self.available_libs['periodictable']:
self.method = 'calculation'
self.calculate_delta()
self.characteristic_value = self.analytical_delta
self.closest_energy = self.energy
else:
raise ValueError('"periodictable" library is not available. Install it if you want to use it.')
else:
self._find_characteristic_value()
if self.verbose:
self.print_info()
if self.save_output:
return_dict = {}
for k in d['cli_functions']['find_delta']['returns']:
return_dict[k] = getattr(self, k)
file_name = '{}.json'.format(_output_file_name(self.elements, self.characteristic))
with open(file_name, 'w') as f:
json.dump(return_dict, f)
[docs] def calculate_delta(self):
rho = getattr(self.periodictable, self.formula).density
z = getattr(self.periodictable, self.formula).number
mass = getattr(self.periodictable, self.formula).mass
z_over_a = z / mass
wl = 2 * math.pi * 1973 / self.energy # lambda= (2pi (hc))/E
self.analytical_delta = 2.7e-6 * wl ** 2 * rho * z_over_a
[docs] def print_info(self):
msg = 'Found {}={} for the closest energy={} eV from {}.'
print(msg.format(self.characteristic, self.characteristic_value, self.closest_energy, self.method))
[docs] def save_to_file(self):
self.e_min = self.default_e_min
self.e_max = self.e_min
counter = 0
try:
os.remove(self.outfile)
except:
pass
while self.e_max < self.default_e_max:
self.e_max += self.n_points * self.e_step
if self.e_max > self.default_e_max:
self.e_max = self.default_e_max
self._request_from_server()
if counter > 0:
# Get rid of headers (2 first rows) and the first data row to avoid data overlap:
content = self.content.split('\n')
self.content = '\n'.join(content[3:])
with open(self.outfile, 'a') as f:
f.write(self.content)
counter += 1
self.e_min = self.e_max
if self.verbose:
print('Data from {} eV to {} eV saved to the <{}> file.'.format(
self.default_e_min, self.default_e_max, self.outfile))
print('Energy step: {} eV, number of points/chunk: {}, number of chunks {}.'.format(
self.e_step, self.n_points, counter))
def _check_imports(self):
self.available_libs = {
'numpy': None,
'periodictable': None,
'requests': None,
}
for key in self.available_libs.keys():
try:
__import__(key)
setattr(self, key, __import__(key))
self.available_libs[key] = True
except:
self.available_libs[key] = False
def _find_characteristic_value(self):
skiprows = 2
energy_column = 0
characteristic_value_column = 1
error_msg = 'Error! Use energy range from {} to {} eV.'
if self.use_numpy and self.available_libs['numpy']:
if self.data_file:
data = self.numpy.loadtxt(self.data_file, skiprows=skiprows)
self.default_e_min = data[0, energy_column]
self.default_e_max = data[-1, energy_column]
try:
idx_previous = self.numpy.where(data[:, energy_column] <= self.energy)[0][-1]
idx_next = self.numpy.where(data[:, energy_column] > self.energy)[0][0]
except IndexError:
raise Exception(error_msg.format(self.default_e_min, self.default_e_max))
idx = idx_previous if abs(data[idx_previous, energy_column] - self.energy) <= abs(
data[idx_next, energy_column] - self.energy) else idx_next
self.characteristic_value = data[idx][characteristic_value_column]
self.closest_energy = data[idx][energy_column]
else:
raise Exception('Processing with NumPy is only possible with the specified file, not content.')
else:
if not self.content:
with open(self.data_file, 'r') as f:
self.raw_content = f.read()
else:
if type(self.content) != list:
self.raw_content = self.content
self.content = self.raw_content.strip().split('\n')
energies = []
characteristic_values = []
for i in range(skiprows, len(self.content)):
energies.append(float(self.content[i].split()[energy_column]))
characteristic_values.append(float(self.content[i].split()[characteristic_value_column]))
self.default_e_min = energies[0]
self.default_e_max = energies[-1]
indices_previous = []
indices_next = []
try:
for i in range(len(energies)):
if energies[i] <= self.energy:
indices_previous.append(i)
else:
indices_next.append(i)
idx_previous = indices_previous[-1]
idx_next = indices_next[0]
except IndexError:
raise Exception(error_msg.format(self.default_e_min, self.default_e_max))
idx = idx_previous if abs(energies[idx_previous] - self.energy) <= abs(
energies[idx_next] - self.energy) else idx_next
self.characteristic_value = characteristic_values[idx]
self.closest_energy = energies[idx]
if self.characteristic == 'atten':
self.characteristic_value *= 1e-6 # Atten Length (microns)
def _get_remote_file_content(self):
get_url = '{}{}'.format(self.server_info['server'], self.file_name)
r = self.requests.get(get_url)
self.content = r.text
return self.content
def _get_remote_file_name(self, formula=None):
if self.precise:
e_min = self.energy - 1.0
e_max = self.energy + 1.0
else:
e_min = self.e_min
e_max = self.e_max
payload = {
self.server_info[self.characteristic]['fields']['density']: -1,
self.server_info[self.characteristic]['fields']['formula']: formula,
self.server_info[self.characteristic]['fields']['material']: 'Enter Formula',
self.server_info[self.characteristic]['fields']['max']: e_max,
self.server_info[self.characteristic]['fields']['min']: e_min,
self.server_info[self.characteristic]['fields']['npts']: self.n_points,
self.server_info[self.characteristic]['fields']['output']: 'Text File',
self.server_info[self.characteristic]['fields']['scan']: 'Energy',
}
if self.characteristic == 'atten':
payload[self.server_info[self.characteristic]['fields']['fixed']] = 90.0
payload[self.server_info[self.characteristic]['fields']['plot']] = 'Log'
payload[self.server_info[self.characteristic]['fields']['output']] = 'Plot',
elif self.characteristic == 'transmission':
payload[self.server_info[self.characteristic]['fields']['plot']] = 'Linear'
payload[self.server_info[self.characteristic]['fields']['output']] = 'Plot',
payload[self.server_info[self.characteristic]['fields']['thickness']] = self.thickness # um
r = self.requests.post(
'{}{}'.format(self.server_info['server'], self.server_info[self.characteristic]['post_url']),
payload
)
content = r.text
# The file name should be something like '/tmp/xray2565.dat':
try:
self.file_name = str(
content.split('{}='.format(self.server_info[self.characteristic]['file_tag']))[1]
.split('>')[0]
.replace('"', '')
)
except:
raise Exception('\n\nFile name cannot be found! Server response:\n<{}>'.format(content.strip()))
def _request_from_server(self):
if self.available_libs['requests']:
d = []
for f in self.formula.split(','): # to support multiple chemical elements comma-separated list
self._get_remote_file_name(formula=f)
r = self._get_remote_file_content()
d.append(r)
if self.plot or self.save:
df, columns = vis.to_dataframe(d, self.elements)
if df is not None and columns is not None:
file_name = _output_file_name(self.elements, self.characteristic)
if self.plot:
vis.plot_data(
df=df,
elements=self.elements,
property=self.characteristic,
thickness=self.thickness,
e_min=self.e_min,
e_max=self.e_max,
n_points=self.n_points,
file_name=file_name,
x_label=columns[0],
show_plot=self.show_plot,
)
if self.save:
vis.save_to_csv(df=df, file_name=file_name)
else:
msg = 'Cannot use online resource <{}> to get {}. Use local file instead.'
raise Exception(msg.format(self.server_info['server'], self.characteristic))
def _output_file_name(elements, characteristic):
return '{}_{}'.format(','.join(elements), characteristic) if len(elements) > 1 else characteristic