# -*- coding:utf-8 -*-
"""PyHdust *phc* module: physical constants and general use functions
Includes functions for:
- Geometrical functions in deg
- Data manipulation (average, bin, fitting)
- Manipulation of 3D coordinates
- Convolution functions
- Manipulation of strings and lists
- Manipulation of angular coordinates and dates
- Files or directories manipulation
- Plotting
- Physical functions
- `List of constants`_
- Python-version issues
.. _`List of constants`: phc_list.html
:license: GNU GPL v3.0 https://github.com/danmoser/pyhdust/blob/master/LICENSE
"""
from __future__ import print_function
import sys as _sys
import os as _os
import re as _re
import numpy as _np
import datetime as _dt
from dateutil.relativedelta import relativedelta as _dtdelta
import gzip as _gzip
from bz2 import BZ2File as _BZ2File
from glob import glob as _glob
from itertools import product as _product
import pyhdust.jdcal as _jdcal
from pyhdust.tabulate import tabulate as _tab
from six import string_types as _strtypes
from functools import reduce as _ftreduce
import warnings as _warn
import struct as _struct
import unicodedata as _unicdata
from dateutil import tz as _tz
try:
import csv as _csv
import matplotlib.pyplot as _plt
from matplotlib.ticker import AutoMinorLocator as _AutoMinorLocator
from scipy import optimize as _optimize
except ImportError:
("matplotlib, csv and/or scipy module not installed!!!")
try:
from collections.abc import Iterable as _It
except ImportError:
from collections import Iterable as _It
__author__ = "Daniel Moser"
__email__ = "dmfaes@gmail.com"
# Trigonometric functions in deg
[docs]def cos(ang):
"""Ang in DEGREES"""
return _np.cos(ang * _np.pi / 180)
[docs]def sin(ang):
"""Ang in DEGREES"""
return _np.sin(ang * _np.pi / 180)
[docs]def arcsin(arg):
"""Output in DEGREES"""
return _np.arcsin(arg) * 180 / _np.pi
[docs]def arccos(arg):
"""Output in DEGREES"""
return _np.arccos(arg) * 180 / _np.pi
[docs]def tan(ang):
"""Ang in DEGREES"""
return _np.tan(ang * _np.pi / 180)
[docs]def arctan(arg):
"""Output in DEGREES"""
return _np.arctan(arg) * 180 / _np.pi
[docs]def arctan2(y, x):
"""Output in DEGREES"""
return _np.arctan2(y, x) * 180 / _np.pi
# Data manipulation (includes fitting)
def find_nearest_pt(x0, y0, x, y, z, case=1):
dx = _np.max(x) - _np.min(x)
dy = _np.max(y) - _np.min(y)
if case == 1:
idx = _np.where((x < x0) & (y < y0))
elif case == 2:
idx = _np.where((x > x0) & (y < y0))
elif case == 3:
idx = _np.where((x < x0) & (y > y0))
elif case == 4:
idx = _np.where((x > x0) & (y > y0))
else:
idx = _np.argsort(x)
z = z[idx]
if len(z) > 0:
x = x[idx]
y = y[idx]
d = _np.sqrt(((x0 - x) / dx) ** 2 + ((y0 - y) / dy) ** 2)
idx = _np.where(d == _np.min(d))
return (
_np.average(x[idx]),
_np.average(y[idx]),
_np.average(z[idx]),
_np.average(d[idx]),
)
else:
return _np.NaN, _np.NaN, _np.NaN, _np.NaN
def baricent_calc(x0, y0, x, y, z, fullrange=False):
x1, y1, z1, d1 = find_nearest_pt(x0, y0, x, y, z, case=1)
x2, y2, z2, d2 = find_nearest_pt(x0, y0, x, y, z, case=2)
x3, y3, z3, d3 = find_nearest_pt(x0, y0, x, y, z, case=3)
x4, y4, z4, d4 = find_nearest_pt(x0, y0, x, y, z, case=4)
if _np.sum(_np.isnan([x1, x2, x3, x4])) == 0:
# DO THE BARICENTER CALCULATIONS HERE
idx = _np.argsort([d1, d2, d3, d4])
xs = _np.array([x1, x2, x3, x4])[idx]
ys = _np.array([y1, y2, y3, y4])[idx]
zs = _np.array([z1, z2, z3, z4])[idx]
inside = False
seq = _np.array([[0, 1, 2], [0, 1, 3], [0, 2, 3], [1, 2, 3]])
for i in seq:
# http://mathworld.wolfram.com/TriangleInterior.html
# http://stackoverflow.com/questions/13300904/
# determine-whether-point-lies-inside-triangle
if not inside:
X, Y = (xs[i], ys[i])
alpha = ((Y[1] - Y[2]) * (x0 - X[2]) + (X[2] - X[1]) * (y0 - Y[2])) / (
(Y[1] - Y[2]) * (X[0] - X[2]) + (X[2] - X[1]) * (Y[0] - Y[2])
)
beta = ((Y[2] - Y[0]) * (x0 - X[2]) + (X[0] - X[2]) * (y0 - Y[2])) / (
(Y[1] - Y[2]) * (X[0] - X[2]) + (X[2] - X[1]) * (Y[0] - Y[2])
)
gamma = 1.0 - alpha - beta
if alpha > 0 and beta > 0 and gamma > 0:
Z = zs[i]
inside = True
# http://www.mathopenref.com/coordtrianglearea.html
a = _np.zeros(3)
a[2] = _np.abs(
0.5 * (X[0] * (Y[1] - y0) + X[1] * (y0 - Y[0]) + x0 * (Y[0] - Y[1]))
)
a[1] = _np.abs(
0.5 * (X[0] * (Y[2] - y0) + X[2] * (y0 - Y[0]) + x0 * (Y[0] - Y[2]))
)
a[0] = _np.abs(
0.5 * (X[1] * (Y[2] - y0) + X[2] * (y0 - Y[1]) + x0 * (Y[1] - Y[2]))
)
return _np.sum(Z * a) / _np.sum(a)
elif _np.sum(_np.isnan([x1, x2, x3, x4])) == 1 and fullrange:
idx = _np.argsort([d1, d2, d3, d4])
zs = _np.array([z1, z2, z3, z4])[idx][:3]
ds = _np.array([d1, d2, d3, d4])[idx][:3]
return _np.sum(zs / ds) / _np.sum(1 / ds)
else:
return _np.NaN
[docs]def baricent_map(x, y, z, res=100, fullrange=False, xfact=1.0, yfact=1.0):
"""_fact = apply an factor in the input values **in the interpolation**"""
X = _np.linspace(_np.min(x), _np.max(x), res)
Y = _np.linspace(_np.min(y), _np.max(y), res)
# Must be [::-1] to match the img std
X, Y = _np.meshgrid(X, Y[::-1])
img = _np.zeros((res, res))
for i, j in _product(range(res), range(res)):
img[i, j] = baricent_calc(
X[i, j],
Y[i, j],
_np.array(x) * xfact,
_np.array(y) * yfact,
z,
fullrange=fullrange,
)
return img
[docs]def flatten(list_, outlist=None):
"""Return a flatten list.
If ``outlist`` is provided, the output is extended to it.
"""
if outlist is None:
outlist = []
for item in list_:
if isinstance(item, _It) and not isinstance(item, _strtypes):
flatten(item, outlist)
else:
outlist.append(item)
return outlist
[docs]def wg_avg_and_std(values, sigma):
"""Return the weighted average and standard deviation.
This IS NOT the problem of Wikipedia > Weighted_arithmetic_mean +
Weighted_sample_variance!
average = _np.average(values, weights=weights)
# Fast and numerically precise:
variance = _np.average((values-average)**2, weights=weights)
return (average, _np.sqrt(variance))
INPUT: values, sigma -- arrays with the same shape.
OUTPUT: average, avg_sig (float, float)
"""
values = _np.array(values)
sigma = _np.array(sigma)
avg = _np.average(values, weights=1.0 / sigma)
return avg, _np.sqrt(_np.sum(sigma**2)) / len(values)
[docs]def wg_avg_t(t, y, yerr=None):
"""average weighted on ``t`` with optional ``yerr`` array.
Intended for binning use. To recover the weighted value of ``t``, use
wg_avg_and_std.
Warning: If there is two (or more) ``y`` points for the same value of
``t``, all these points will be ignored.
INPUT: ``t``, ``y`` -- arrays with the same shape.
OUTPUT: average
"""
idx = _np.argsort(t)
t = _np.array(t)[idx]
y = _np.array(y)[idx]
n = len(t)
if yerr is None:
yerr = _np.ones(n).astype(float)
dx = _np.diff(t)
avg = 0.0
for i in range(n - 1):
if dx[i] == 0:
_warn("# phc.wg_avg_t: multiple points for same t value!")
avg += (
dx[i]
* (y[i + 1] / yerr[i + 1] + y[i] / yerr[i])
/ (1.0 / yerr[i] + 1.0 / yerr[i + 1])
)
return avg / _np.sum(dx)
[docs]def fill_points(in_arr, dx, n=1):
"""Return a list of points to be filled in an array.
If the distance of two sucessive points is bigger than ``dx``, then ``n``
new points will be generated.
:param in_arr: Input array.
"""
size = len(in_arr)
new_x = []
idx_extra = []
j = 0
for i in range(size - 1):
new_x += [in_arr[i]]
j += 1
if abs(in_arr[i + 1] - in_arr[i]) > dx:
npts = int(round(abs(in_arr[i + 1] - in_arr[i]) / dx))
tmp = list(_np.linspace(in_arr[i], in_arr[i + 1], npts + n)[1:-1])
new_x += tmp
idx_extra += [k + j for k in range(len(tmp))]
j += len(tmp)
new_x += [in_arr[-1]]
return new_x, idx_extra
[docs]def change_dimension(x, nred=-1, fixedlim=False):
"""Reduce the dimension of an **equally spaced** vector.
`nred`: reduces the vector `x` size if nred < 0 and increases it otherwise.
`fixedlim` keeps the x limits values. Others
"""
if len(_np.shape(x)) != 1:
_warn("# Invalid x shape!!")
return None
nred = int(nred)
if nred == 0:
return x
return _np.linspace(_np.min(x), _np.max(x), len(x) + nred)
[docs]def bindata(x, y, yerr=None, nbins=20, xlim=None, interp=False, perc=0, weightx=True):
"""
Return the weighted binned data. It is assumed no errors for x values.
`interp` linearly interpolates NaN values in `y` (no changes on `x` and
`yerr` arrays).
if ``perc > 0``, then it returns the percentile value of the interval. In
this case, the output xvals will be the ones from the closest y
value within a given bin corresponding to that percentile.
`weightx` only works if `perc=0`. `True` returns the simply averaged `x`
value of a given bin interval (it is not based on y or yerr values);
`False` returns the regularly spaced `x` bin values.
INPUT: x, y, yerr - arrays with the same shape (they don't need to be
sorted); nbins=int, xlim=[xmin, xmax].
OUTPUT: xvals, yvals, new_yerr [if `yerr!=None`]; (arrays)
"""
x = _np.array(x)
y = _np.array(y)
if yerr is None:
yerr = _np.ones(len(x))
else:
yerr = _np.array(yerr)
nans = _np.isnan(y)
if interp and _np.sum(nans) > 0:
y = _np.interp(x, x[~nans], y[~nans])
else:
x = x[~nans]
y = y[~nans]
yerr = yerr[~nans]
if xlim is None:
xmax = _np.max(x)
xmin = _np.min(x)
else:
xmin, xmax = xlim
shift = (xmax - xmin) / (nbins - 1.0)
tmpx = _np.arange(nbins) * shift + xmin
tmpy = _np.zeros(nbins)
tmpyerr = _np.zeros(nbins)
idx = _np.zeros(nbins, dtype=bool)
for i in range(nbins):
selx = _np.where(abs(x - tmpx[i]) <= shift / 2.0 + 1e-6)
if len(selx[0]) >= 1:
if perc <= 0:
if weightx:
tmpx[i] = _np.average(x[selx])
tmpy[i], tmpyerr[i] = wg_avg_and_std(y[selx], yerr[selx])
else:
tmpy[i] = _np.percentile(y[selx], perc)
pid = find_nearest(y[selx], tmpy[i], idx=True)
if weightx:
tmpx[i] = x[selx][pid]
_, tmpyerr[i] = wg_avg_and_std(y[selx], yerr[selx])
idx[i] = True
if _np.sum(yerr) / len(x) == 1:
return tmpx[idx], tmpy[idx]
else:
return tmpx[idx], tmpy[idx], tmpyerr[idx]
[docs]def chi2calc(mod, obs, sig_obs=None, npar=1, fast=False):
"""Calculate the chi2.
If `fast=False`, the function skips a NaN test in the arrays."""
if sig_obs is None:
sig_obs = _np.ones(len(obs))
if not fast:
nans, tmp = nan_helper(obs)
obs = _np.array(obs)[~nans]
mod = _np.array(mod)[~nans]
sig_obs = _np.array(sig_obs)[~nans]
return _np.sum((mod - obs) ** 2 / sig_obs**2) / (len(sig_obs) - npar - 1)
[docs]def splitequal(n, N):
"""Split `N` in approx. `n` igual parts
`N` must be integer.
Suggestion: *phc.splitequal(N/8., N)* split N into sequences of
approx. 8 itens.
"""
n = int(round(n))
idx = []
for i in range(n):
idx.append([i * N / n, (i + 1) * N / n])
if n == 0:
idx = [[0, N]]
return idx
[docs]def interLinND(X, X0, X1, Fx, disablelog=False):
"""
N-dimensional linear interpolation in LOG space!!
Pay attention: Fx must always be > 0. If not, put disablelog=True.
Other important thing: Fx must be regularly spaced (i.e., [Fx0y0, Fx0y1,
Fx1y0, Fx1y1] if X0=[x0,y0] and X1=[x1,y1]).
If it is not the case, see *interBar?D* function.
| INPUT:
| X = position in with the interpolation is desired;
| X0 = minimal values of the interval;
| X1 = vmaximum values of the inveral
| Fx = function values along the interval, ORDERED BY DIMENSTION.
| Example: Fx = [F00, F01, F10, F11]
OUTPUT: interpolated value (float)"""
X = _np.array(X)
X0 = _np.array(X0)
X1 = _np.array(X1)
Xd = (X - X0) / (X1 - X0)
DX = _np.array([[(1 - x), x] for x in Xd])
#
i = 0
F = 0
for prod in _product(*DX):
if disablelog:
F += Fx[i] * _np.product(prod)
else:
F += _np.log(Fx[i]) * _np.product(prod)
i += 1
#
if not disablelog:
return _np.exp(F)
else:
return F
[docs]def optim(p0, x, y, yerr, func, errfunc=None):
"""Do scipy.optimize.leastsq minimization.
Default error function is the chi2 function.
Requirements:
- func(p, x) is a previously user defined function.
- len(x)=len(y)=len(yerr)
Output:
best parameters (p), chi2_red value
"""
if errfunc is None:
def errfunc(p, x, y, yerr, func):
"""error function"""
# return _np.sum( ((y-func(p,x))/yerr)**2 )
return ((y - func(p, x)) / yerr) ** 2
bestp, tmp = _optimize.leastsq(errfunc, p0, args=(x, y, yerr, func))
c2red = chi2calc(func(bestp, x), y, yerr, npar=len(p0))
return bestp, c2red
[docs]def optim2(p0, x, y, yerr, func):
"""Do scipy.optimize.curve_fit minimization.
It returns errors to the parameters fitting!!!
Requirements:
- func(p, x) is a previously user defined function.
- len(x)=len(y)=len(yerr)
Output:
best params (p), params errors (perr), chi2_red value
"""
bestp, cov = _optimize.curve_fit(func, x, y, p0=p0, sigma=yerr)
c2red = chi2calc(func(x, *bestp), y, yerr, npar=len(p0))
return bestp, _np.sqrt(_np.diag(cov)), c2red
[docs]def bin_ndarray(ndarray, new_shape, operation="avg"):
"""
Bins an ndarray in all axes based on the target shape, by summing or
averaging.
Number of output dimensions must match number of input dimensions.
Example
-------
>>> m = _np.arange(0,100,1).reshape((10,10))
>>> n = bin_ndarray(m, new_shape=(5,5), operation='sum')
>>> print(n)
[[ 22 30 38 46 54]
[102 110 118 126 134]
[182 190 198 206 214]
[262 270 278 286 294]
[342 350 358 366 374]]
"""
if not operation.lower() in ["sum", "mean", "average", "avg"]:
raise ValueError("Operation {} not supported.".format(operation))
if ndarray.ndim != len(new_shape):
raise ValueError("Shape mismatch: {} -> {}".format(ndarray.shape, new_shape))
compression_pairs = [(d, c // d) for d, c in zip(new_shape, ndarray.shape)]
flattened = [j for p in compression_pairs for j in p]
# print flattened
ndarray = ndarray.reshape(flattened)
for i in range(len(new_shape)):
if operation.lower() == "sum":
ndarray = ndarray.sum(-1 * (i + 1))
elif operation.lower() in ["mean", "average", "avg"]:
ndarray = ndarray.mean(-1 * (i + 1))
return ndarray
# Convolution functions
[docs]def normgauss(sig, x=None, xc=0.0):
"""Normalized Gaussian function.
INPUT: sigma value (float), x (array), xc (float)
OUTPUT: yvals (array)
"""
if x is None:
x = _np.linspace(-5 * sig, 5 * sig, 101)
return (
1.0
/ (2 * _np.pi * sig**2) ** 0.5
* _np.exp(-((x - xc) ** 2.0) / (2 * sig**2.0))
)
[docs]def normbox(hwidth, x=None, xc=0.0):
"""Normalized Box function.
INPUT: half-width (number), x (array), xc (float)
OUTPUT: yvals (array)
"""
if x is None:
x = _np.linspace(-hwidth * 2 + xc, hwidth * 2 + xc, 101)
y = _np.zeros(len(x))
idx = _np.where(_np.abs(x - xc) <= hwidth)
y[idx] = _np.zeros(len(y[idx])) + 2.0 / hwidth
return y
[docs]def convnorm(x, arr, pattern):
"""Do the convolution of arr with pattern.
Vector x is required for normalization. Its length must be odd!
INPUT: units space (x, array), original array (arr), pattern (array)
OUTPUT: array
"""
if len(x) != len(pattern):
_warn.warn("Wrong format of x and/or pattern arrays!")
return None
if len(x) % 2 == 0:
_warn.warn("Even length of arrays. Interpolating n-1 " "dimension!")
idx = _np.argsort(x)
x0 = x[idx]
pattern0 = pattern[idx]
x = _np.linspace(x0[0], x0[-1], len(x0) - 1)
pattern = _np.interp(x, x0, pattern0)
dx = (x[-1] - x[0]) / (len(x) - 1.0)
cut = len(x) // 2
return _np.convolve(pattern, arr)[cut:-cut] * dx
# 3D Coordinates manipulation (rotation)
[docs]def cart2sph(x, y, z=None):
"""Cartesian to spherical coordinates. Output in **rad**.
**Warning**: :math:`z=0\\rightarrow \\theta=\\pi`.
TODO: check is len(x)==len(y)
INPUT: arrays of same length
OUTPUT: arrays
"""
if z is None:
z = _np.zeros(len(x))
r = _np.sqrt(x**2 + y**2 + z**2)
th = _np.arccos(z / r)
phi = _np.arctan2(y, x)
return r, phi, th
[docs]def sph2cart(r, phi, th=None):
"""**Warning**: the sequence is :math:`(r, \\phi, \\theta)`."""
if th is None:
th = _np.zeros(len(r))
x = r * _np.sin(th) * _np.cos(phi)
y = r * _np.sin(th) * _np.sin(phi)
z = r * _np.cos(th)
return x, y, z
[docs]def cart_rot(x, y, z, ang_xy=0.0, ang_yz=0.0, ang_zx=0.0):
"""Apply rotation in Cartesian coordinates.
INPUT: 3 arrays of same length, 3 angles (float, in radians).
OUTPUT: arrays"""
rotmtx = _np.array(
[
[
_np.cos(ang_zx) * _np.cos(ang_xy),
-_np.cos(ang_yz) * _np.sin(ang_xy)
+ _np.sin(ang_yz) * _np.sin(ang_zx) * _np.cos(ang_xy),
_np.sin(ang_yz) * _np.sin(ang_xy)
+ _np.cos(ang_yz) * _np.sin(ang_zx) * _np.cos(ang_xy),
],
[
_np.cos(ang_zx) * _np.sin(ang_xy),
_np.cos(ang_yz) * _np.cos(ang_xy)
+ _np.sin(ang_yz) * _np.sin(ang_zx) * _np.sin(ang_xy),
-_np.sin(ang_yz) * _np.cos(ang_xy)
+ _np.cos(ang_yz)
+ _np.sin(ang_zx) * _np.sin(ang_xy),
],
[
-_np.sin(ang_zx),
_np.sin(ang_yz) * _np.cos(ang_zx),
_np.cos(ang_yz) * _np.cos(ang_zx),
],
]
)
vec = _np.array([x, y, z])
return _np.dot(rotmtx, vec)
# Lists and strings manipulation
[docs]def isnumber(val, contains=False):
"""Return [True|False] if a string can be converted to a number (int,
float or scientific notation).
`contains`: check if the string CONTAINS a valid number.
"""
rule = r"-?[\d.]+(?:e-?\d+)?"
match = _re.findall(rule, val)
isn = False
if len(match) == 1:
isn = True
if not contains:
if match[0] != val:
isn = False
return isn
[docs]def str2float(val):
"""Returns a float converted from the string ``val``.
If more than one float is identified, it returns ``None``"""
rule = r"-?[\d.]+(?:e-?\d+)?"
match = _re.findall(rule, val)
if len(match) != 1:
return None
return float(match[0])
[docs]def closest_idx(arr, mtx):
"""Return the closest index of `mtx` corresponding to the values of `arr`."""
lidx = []
for i in range(len(arr)):
lidx.append(_np.where(mtx[:, i] == find_nearest(mtx[:, i], arr[i]))[0])
return _ftreduce(_np.intersect1d, lidx)[0]
[docs]def unic2ascii(string):
"""Converts unicode to ASCII"""
return _unicdata.normalize("NFKD", string).encode("ascii", "ignore")
[docs]def readpck(n, tp, ixdr, f):
"""Read XDR
- n: length
- tp: type ('i', 'l', 'f', 'd')
- ixdr: counter
- f: file-object
:returns: ixdr (counter), _np.array
"""
sz = dict(zip(["i", "l", "f", "d"], [4, 4, 4, 8]))
s = sz[tp]
upck = ">{0}{1}".format(n, tp)
return ixdr + n * s, _np.array(_struct.unpack(upck, f[ixdr : ixdr + n * s]))
[docs]def reshapeltx(ltxtb, ncols=2, latexfmt=True):
"""Reshape a latex table.
:param ltxtb: latex table (only contents)
:type ltxtb: string, or list of strings
:rtype: string, or _np.array (str)
:returns: latex formatted table, or matrix
"""
t = ltxtb
if not isinstance(t, _strtypes):
t = "".join(flatten(t))
t = t.replace(" ", "").replace(r"\\", "&").replace("\n", "")
t = t.split("&")
t = _np.array(t)
t = _np.delete(t, _np.where(t == ""))
nadd = ncols - len(t) % ncols
if nadd > 0:
t = _np.append(t, _np.tile("", nadd))
if latexfmt:
return _tab(t.reshape((-1, ncols)), tablefmt="latex")
else:
return t.reshape((-1, ncols))
# def find_after(id, ref):
# """ Returns the line where `id` if found after `ref`.
# """
# return
[docs]def log_norm(x, vmax=1.0, vmin=0.01, autoscale=True, clip=True):
"""Renormalize ``x`` (value or vector) making a correspondance of [0-1]
to [vmin, vmax] in log scale.
If ``autoscale`` and x is a vector, ``vmax`` and ``vmin`` are automatically
set (``x`` must have at least a positive value).
``clip`` force the range to be between 0 and 1.
"""
x = _np.array(x)
if autoscale:
(vmin, vmax) = (_np.min(x[_np.where(x > 0)]), _np.max(x))
if vmin <= 0:
_warn.warn("vmin <= 0 at phc.log_norm!")
return x
if vmax <= vmin:
_warn.warn("vmax <= vmin at phc.log_norm!")
return x
if clip:
x = x.clip(vmin, vmax)
return _np.log10(x / vmin) / _np.log10(vmax / vmin)
[docs]def renormvals(xlist, xlims, ylims):
"""Renormalize ``xlist`` according to ``_lims`` values.
len(xlims)=len(ylims)=2
"""
a = _np.diff(ylims) / _np.diff(xlims)
b = ylims[0] - a * xlims[0]
return a * _np.array(xlist) + b
[docs]def expand_list(xlist, logspaced=False):
"""Expand ``xlist`` centered values to interval limits.
`logspaced` assumed to be in base 10.
OUTPUT: array[len(xlist)+1]
"""
if logspaced:
xlist = _np.log10(xlist)
d = _np.diff(xlist)
out = _np.zeros(len(xlist) + 1)
out[0] = xlist[0] - d[0] / 2
out[1:-1] = xlist[:-1] + d / 2
out[-1] = xlist[-1] + d[-1] / 2
if logspaced:
out = 10**out
return out
[docs]def keys_values(keys, text, delimiter="_"):
"""Return the values in a string coded as *KeyValueDelimiter*. The keys
do not need to be in order in the text.
Important! The text can not start with a keyword (if it is the case, add
a delimiter first) and it must have a delimiter after the last key.
TODO: Relax the important message considering starting with the first key
and a sign as last delimiter.
Example:
.. code:: python
keys = ['M', 'ob', 'H', 'Z', 'b']
text = 'Be_M04.20_ob1.30_H0.30_Z0.014_bE_Ell.txt'
print( phc.keys_values(keys, text) )
"""
vals = []
for k in keys:
rule = ".*{0}{1}([0-9.e+-]+)[{0}\\.].*".format(delimiter, k)
out = _re.findall(rule, text, flags=_re.DOTALL)
if len(out) == 1:
vals += [out[0]]
else:
vals += [""]
_warn.warn("# Invalid key {0} for {1}".format(k, text))
return vals
[docs]def fltTxtOccur(s, lines, n=1, seq=1, after=True, asstr=False):
"""Return the seq-th float of the line after the n-th
occurrence of `s` in the array `lines`.
INPUT: s=string, lines=array of strings, n/seq=int (starting at 1)
OUTPUT: float"""
if isinstance(lines, _strtypes):
lines = [lines]
fltregex = r"[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?"
if after:
occur = [x[x.find(s) + len(s) :] for x in lines if x.find(s) > -1]
else:
occur = [x for x in lines if x.find(s) > -1]
out = _np.NaN
if len(occur) >= n:
occur = occur[n - 1]
out = _re.findall(fltregex, occur)[seq - 1]
if not asstr:
out = float(out)
return out
[docs]def strrep(seq, n, newseq):
"""Insert `newseq` at position `n` of the string `seq`.
seq[n] = seq[:n]+newseq+seq[n+1:]
Note: the string at the position `n` is replaced!!!
OUTPUT: string
"""
return seq[:n] + newseq + seq[n + 1 :]
[docs]def find_nearest(array, value, bigger=None, idx=False):
"""Find nearest VALUE in the array and return it.
INPUT: array, value
OUTPUT: closest value (array dtype)
"""
if bigger is None:
array = _np.array(array)
i = (_np.abs(array - value)).argmin()
found = array[i]
elif bigger:
found = _np.min([x for x in array if x > value])
i = _np.where(array == found)
i = flatten(i)[0]
elif not bigger:
found = _np.max([x for x in array if x < value])
i = _np.where(array == found)
i = flatten(i)[0]
else:
_warn.warn("# ERROR at bigger!!")
# return
if not idx:
return found
else:
return i
[docs]def find_nearND(matrix, array, idx=False, bigger=None, outlen=1):
"""Find nearest array values in a MATRIX.
INPUT: array, value
OUTPUT: closest value (array dtype)
"""
if len(array) == 1:
array = array[0]
if _np.shape(matrix)[1] != len(array):
raise ValueError("shape(matrix)[1] must be == len(array)")
dists = []
for i in range(len(array)):
nfact = _np.max(matrix[:, i]) - _np.min(matrix[:, i])
if nfact <= 0:
continue
dists.append(_np.abs(matrix[:, i] - array[i]) / nfact)
dists = _np.sum(dists, axis=0)
idsort = _np.argsort(dists)
if bigger:
valid = []
for nid in idsort:
chk = [True for i in range(len(array)) if (matrix[nid, i] > array[i])]
if len(chk) == len(array):
valid.extend([nid])
if len(valid) == outlen:
idsort = valid
break
if valid == []:
_warn.warn("# Invalid values for bigger == {0}".format(bigger))
elif bigger is False:
valid = []
for nid in idsort:
chk = [True for i in range(len(array)) if (matrix[nid, i] < array[i])]
if len(chk) == len(array):
valid.extend([nid])
if len(valid) == outlen:
idsort = valid
break
if valid == []:
_warn.warn("# Invalid values for bigger == {0}".format(bigger))
# return
if "valid" in locals():
if len(valid) == 0:
print(idsort)
raise ValueError("Invalid values for bigger == {0}".format(bigger))
if not idx:
if outlen <= 1:
return matrix[idsort[0]]
else:
return matrix[idsort[:outlen]]
else:
if outlen <= 1:
return idsort[0]
else:
return idsort[:outlen]
[docs]def is_inside_ranges(par, ranges):
"""
Checks if par is inside ranges, returns boolean value.
Usage:
inside_ranges = is_inside_ranges(par, ranges)
"""
ranges = _np.array(ranges)
if len(ranges.flatten()) == 2:
inside_ranges = (par >= ranges[0]) and (par <= ranges[1])
else:
count = 0
inside_ranges = True
while inside_ranges * (count < len(par)):
inside_ranges = (par[count] >= ranges[count, 0]) and (
par[count] <= ranges[count, 1]
)
count += 1
return inside_ranges
[docs]def find_neighbours(par, par_grid, ranges, silent=True):
"""
Finds neighbours' positions of par in par_grid.
Usage:
keep, out, inside_ranges, par_new, par_grid_new = find_neighbours(par,
par_grid, ranges, silent=True)
where redundant columns in 'new' values are excluded, but length is
preserved (i.e., par_grid[keep] in griddata call).
"""
# check if inside ranges
inside_ranges = is_inside_ranges(par, ranges)
# find neighbours
keep = _np.array(len(par_grid) * [True])
out = []
for i in range(len(par)):
# out of ranges
if not is_inside_ranges(par[i], ranges[i]):
if not silent:
print(
"[find_neighbours] Warning: parameter entry out of "
"allowed range, taking closest value"
)
keep *= _np.abs(par[i] - par_grid[:, i]) == _np.min(
_np.abs(par[i] - par_grid[:, i])
)
out.append(i)
# coincidence
elif (par[i] == par_grid[:, i]).any():
keep *= par[i] == par_grid[:, i]
out.append(i)
# is inside
else:
# list of values
par_list = _np.array(list(set(par_grid[:, i])))
# nearest value at left
par_left = par_list[par_list < par[i]]
par_left = par_left[_np.abs(par_left - par[i]).argmin()]
# nearest value at right
par_right = par_list[par_list > par[i]]
par_right = par_right[_np.abs(par_right - par[i]).argmin()]
# select rows
kl = par_grid[:, i] == par_left
kr = par_grid[:, i] == par_right
keep *= kl + kr
# delete coincidences
par_new = _np.delete(par, out)
par_grid_new = _np.delete(par_grid, out, axis=1)
return keep, out, inside_ranges, par_new, par_grid_new
[docs]def nan_helper(y):
"""Helper to handle indices and logical indices of NaNs.
Input:
- y, 1d numpy array with possible NaNs
Output:
- nans, logical indices of NaNs
- index, a function, with signature indices= index(logical_indices),
to convert logical indices of NaNs to 'equivalent' indices
Example:
>>> # linear interpolation of NaNs
>>> nans, x = nan_helper(y)
>>> # x is a lambda "sequence" function to interpolation purposes
>>> y[nans]= _np.interp(x(nans), x(~nans), y[~nans])
"""
return _np.isnan(y), lambda z: z.nonzero()[0]
# Angular coordinates and dates manipulation
[docs]def dtflag(ms=False):
"""It returns the current "datetime" flag, i.e., a string of the current
date and time formated as yyyymmdd-hhMM."""
now = _dt.datetime.now()
if not ms:
return "{0}{1:02d}{2:02d}-{3:02d}{4:02d}{5:02d}".format(
now.year, now.month, now.day, now.hour, now.minute, now.second
)
else:
return "{0}{1:02d}{2:02d}-{3:02d}{4:02d}{5:02d}{6:02.0f}".format(
now.year,
now.month,
now.day,
now.hour,
now.minute,
now.second,
now.microsecond / 1e4,
)
[docs]def MJD2greg(mjd):
"""mjd: float
returns YY,MM,DD,dfrac
"""
return _jdcal.jd2gcal(_jdcal.MJD_0, mjd)
[docs]def MJD2datetime(mjd):
"""mjd: float
returns datetime (seconds precision)
"""
greg = MJD2greg(mjd)
hour = int(greg[-1] * 24 // 1)
minute = int((greg[-1] * 24 % 1) * 60 // 1)
second = int(((greg[-1] * 24 % 1) * 60 % 1) * 60 // 1)
return _dt.datetime(
year=greg[0],
month=greg[1],
day=greg[2],
hour=hour,
minute=minute,
second=second,
tzinfo=_tz.gettz("UTC"),
)
[docs]def greg2MJD(yr, mon, day, fracday):
"""It returns the MJD of a given YYYY,MM,DD (integers) and fraction of the
day (float) as a float.
"""
MJD = _jdcal.gcal2jd(yr, mon, day)[1]
return MJD + fracday
[docs]def longdate2MJD(ldate):
"""FROM YYYY-MM-HHThh:mm:ss.sss to MJD (float)."""
ldate, hms = ldate.split("T")
ldate = _np.array(ldate.split("-"), dtype=int)
mjd = _jdcal.gcal2jd(*ldate)[1]
return mjd + hms2fracday(hms)
[docs]def hms2fracday(hms):
"""Enter hour:min:sec (string) and return fraction of a day (float)"""
hms = _np.array(hms.split(":"), dtype="float")
return (hms[0] + hms[1] / 60.0 + hms[2] / 3600) / 24.0
[docs]def hms2sec(hms):
"""Enter hour:min:sec (string) and return it in seconds (float)"""
hms = _np.array(hms.split(":"), dtype="float")
if len(hms) == 3:
return hms[0] * 3600.0 + hms[1] * 60.0 + hms[2]
elif len(hms) == 2:
return hms[0] * 60.0 + hms[1]
else:
_warn.warn("# WRONG hms FORMAT!")
print(hms)
return 0.0
[docs]def fracday2hms(frac):
"""Enter fraction of a day (e.g., MJD) and return integers of hour, min,
sec.
INPUT: float
OUTPUT: hour, min, sec (int, int, int)
"""
hh = frac * 24
if int(hh) > 0:
mm = hh % int(hh)
hh = int(hh)
else:
mm = hh
hh = 0
mm = mm * 60
if int(mm) > 0:
ss = mm % int(mm)
mm = int(mm)
else:
ss = mm
mm = 0
ss = int(round(ss * 60))
return hh, mm, ss
[docs]def raf2ra(raf):
"""Decimal RA (float in deg) to sexagesimal RA (h:m:s)"""
raf /= 15.0
hour, amin = divmod(raf, 1)
amin *= 60
amin, asec = divmod(amin, 1)
asec *= 60
return ("{:02.0f}:{:02.0f}:{:06.3f}").format(hour, amin, asec)
[docs]def decf2dec(decf):
"""Decimal DEC (float) to sexagesimal DEC (deg:m:s; string)"""
deg, amin = divmod(decf, 1)
amin *= 60
amin, asec = divmod(amin, 1)
asec *= 60
return ("{:02.0f}:{:02.0f}:{:06.3f}").format(deg, amin, asec)
[docs]def ra2degf(rastr):
"""RA (sexagemsimal; string) to degrees (decimal; float)"""
rastr = rastr.replace("::", ":")
rastr = rastr.replace(",", ".")
vals = _np.array(rastr.split(":")).astype(float)
return (vals[0] + vals[1] / 60.0 + vals[2] / 3600.0) * 360.0 / 24
[docs]def dec2degf(decstr, delimiter=":"):
"""Sexagesimal (string) to decimal (float)"""
vals = _np.array(decstr.split(delimiter)).astype(float)
if vals[0] < 0:
vals[1:] *= -1
return vals[0] + vals[1] / 60.0 + vals[2] / 3600.0
[docs]def gentkdates(mjd0, mjd1, fact, step, dtstart=None):
"""Generates round dates between ``mjd0`` and ``mjd1`` in a given step.
Valid ``steps`` are:
'd/D/dd/DD' for days;
'w/W/ww/WW' for weeks;
'm/M/mm/MM' for months;
'y/Y/yy/YY/yyyy/YYYY' for years.
dtstart (optional) is expected to be in _datetime._datetime.date() format
[i.e., datetime.date(yyyy, m, d)].
``fact`` must be an integer.
INPUT: float, float, float, int, step (see above), dtstart (see above)
OUTPUT: list of _datetime.date
"""
# check sanity of dtstart
if dtstart is None:
dtstart = _dt.datetime(*_jdcal.jd2gcal(_jdcal.MJD_0, mjd0)[:3]).date()
mjdst = _jdcal.gcal2jd(dtstart.year, dtstart.month, dtstart.day)[1]
else:
mjdst = _jdcal.gcal2jd(dtstart.year, dtstart.month, dtstart.day)[1]
if mjdst < mjd0 - 1 or mjdst > mjd1:
_warn.warn('Invalid "dtstart". Using mjd0.')
dtstart = _dt.datetime(*_jdcal.jd2gcal(_jdcal.MJD_0, mjd0)[:3]).date()
# define step 'position' and vector:
# if dtstart.day > 28:
# dtstart = dtstart.replace(day=28)
basedata = dtstart
dates = []
mjd = mjdst
while mjd < mjd1 + 1:
dates += [basedata]
if step.upper() in ["Y", "YY", "YYYY"]:
basedata += _dtdelta(years=fact)
elif step.upper() in ["M", "MM"]:
basedata += _dtdelta(months=fact)
elif step.upper() in ["W", "WW"]:
basedata += _dtdelta(weeks=fact)
elif step.upper() in ["D", "DD"]:
basedata += _dtdelta(days=fact)
else:
_warn.warn("# ERROR! Invalid step")
raise SystemExit(1)
mjd = _jdcal.gcal2jd(basedata.year, basedata.month, basedata.day)[1]
return dates
[docs]def deg2rad(deg=1.0):
r""":math:`1^\circ=\frac{\pi}{180}` rad."""
return deg * _np.pi / 180.0
[docs]def rad2deg(rad=1.0):
r""":math:`1^\circ=\frac{\pi}{180}` rad."""
return rad * 180.0 / _np.pi
[docs]def arcmin2rad(arm=1.0):
r""":math:`1'=\frac{\pi}{10800}` rad."""
return arm * _np.pi / 10800.0
[docs]def rad2arcmin(rad=1.0):
r""":math:`1'=\frac{\pi}{10800}` rad."""
return rad * 10800.0 / _np.pi
[docs]def arcsec2rad(ars=1.0):
r""":math:`1"=\frac{\pi}{648000}` rad."""
return ars * _np.pi / 648000.0
[docs]def rad2arcsec(rad=1.0):
r""":math:`1"=\frac{\pi}{648000}` rad."""
return rad * 648000.0 / _np.pi
[docs]def mas2rad(mas=1.0):
r"""1 mas :math:`=\frac{\pi}{648000000}` rad."""
return mas * _np.pi / 648000000.0
[docs]def rad2mas(rad=1.0):
r"""1 mas :math:`=\frac{\pi}{648000000}` rad."""
return rad * 648000000.0 / _np.pi
[docs]def deg2mas(deg=1.0):
r""":math:`1^\circ = 3600000` mas."""
return deg * 3600000.0
[docs]def mas2deg(mas=1.0):
r""":math:`1^\circ = 3600000` mas."""
return mas / 3600000.0
# Files manipulation
[docs]def csvread(fname, delimiter=","):
"""Read a CSV file and return it as a list (table). It handles entries
with quotation marks as a single column.
"""
out = []
with open(fname, "r") as f:
reader = _csv.reader(f, delimiter=delimiter)
for row in reader:
out += [row]
return out
def fileread(fname, bin=False):
rmode = "r"
if bin:
rmode = "rb"
ext = _os.path.splitext(fname)[1]
if ext == ".gz":
return _gzip.open(fname, rmode).read()
elif ext == ".bz2":
return _BZ2File(fname, rmode).read()
else:
return open(fname, rmode).read()
[docs]def readfixwd(fname, wlims, chklims=True):
"""``chklims`` : read line from beginning until the end"""
lines = fileread(fname).split("\n")
lsz = [len(ls) for ls in lines]
if wlims[-1] is None or wlims[-1] < 0:
chklims = True
wlims = [int(i - 1) for i in wlims if (i is not None and i >= 0)]
if wlims[0] < 0:
wlims[0] = 0
if chklims and wlims[0] != 0:
wlims = [0] + wlims
if chklims and wlims[-1] < _np.max(lsz) - 1:
wlims += [_np.max(lsz)]
out = _np.chararray(
(len(lines), len(wlims) - 1),
itemsize=_np.max([_np.max(_np.diff(wlims)), wlims[0]]),
)
for j in range(len(lines)):
out[j] = [
lines[j][wlims[i - 1] : wlims[i]].strip().replace(",", "^")
for i in range(1, len(wlims))
]
return out
def readtextable(fname, commentchars="#", splitchars=r"&|\pm", delchars=r"\$" + "\r "):
lines = fileread(fname).split("\n")
out = []
for il in lines:
if len(il) == 0:
continue
if il[0] in commentchars:
continue
for c in delchars:
il = il.replace(c, "")
out.append(_re.split("|".join(splitchars.split("|")), il))
return out
[docs]def sortfile(file, quiet=False):
"""Sort the file."""
f0 = open(file, "r")
lines = f0.readlines()
f0.close()
lines.sort()
f0 = open(file, "w")
f0.writelines(lines)
f0.close()
if not quiet:
print("# File {0} sorted!".format(file))
return
[docs]def outfld(fold="hdt"):
"""
Check and create (if necessary) an (sub)folder - generally used for output.
INPUT: *fold=string
OUTPUT: *system [folder creation]
"""
_warn.warn("# Deprecated! Use `os` tools directly")
if not _os.path.exists(fold):
_os.system("mkdir {0}".format(fold))
return
[docs]def trimpathname(file):
"""Trim the full path string to return path and filename.
INPUT: full file path
OUTPUT: folder path, filename (strings)"""
_warn.warn("# This is Deprecated! Use `os.path.split` instead!")
return list(_os.path.split(file))
[docs]def rmext(name):
"""Remove the extension of a filename.
Criteria: last `.` sets the extension.
INPUT: filename (string)
OUTPUT: filename without extension (string)"""
i = name.rfind(".")
if i == -1:
return name
return name[: name.rfind(".")]
[docs]def readrange(file, i0, ie):
"""Read a specific range of lines of a file with minimal memory use.
Note that i == n-1 for the n-th line.
INPUT: string, int, int
OUTPUT: list of strings"""
_warn.warn("Update this with linecache for better performance")
lines = []
fp = open(file)
for i, line in enumerate(fp):
if i >= i0:
lines += [line]
if i >= ie:
break
fp.close()
return lines
[docs]def splitfilelines(n, file):
"""Break the *file* into *n* files. It also erases the expression "qsub ".
OUTPUT: `file_##.txt`"""
f0 = open(file)
lines = f0.readlines()
f0.close()
lines.sort()
lines = [line.replace("qsub ", "") for line in lines]
outname = trimpathname(file)[1].replace(".sh", "")
N = len(lines)
for i in range(n):
f0 = open("{0}_{1:02d}.txt".format(outname, i), "w")
f0.writelines(lines[i * N / n : (i + 1) * N / n])
f0.close()
print("# {0} files created!".format(n))
return
[docs]def recsearch(root="./", fstr=[""], allfstr=True, fullpath=False):
"""
Do a recursive search in `root` looking for files containg `fstr`.
`fstr` must be a list!
If `allfstr` is True, *all* itens in fstr must be in the path+file names
structure. If False, *any* of the itens must be.
INPUT: root (string), fstr (list of strings)
OUTPUT: list of strings
"""
outflist = []
for i in range(len(fstr)):
fstr[i] = fstr[i].replace("*", "")
for root, subfolders, files in _os.walk(root):
for f in files:
fcomp = f
if fullpath:
fcomp = _os.path.join(root, f)
if allfstr:
if all(x in fcomp for x in fstr):
outflist.append(root + "/" + f)
else:
if any(x in fcomp for x in fstr):
outflist.append(root + "/" + f)
return outflist
[docs]def renlist(root, newr):
"""The routine changes each A_STR_B to A_NEW_B inside the running folder."""
files = _glob("*{0}*".format(root))
files.sort()
for i in range(len(files)):
_os.system('mv "' + files[i] + '" "' + files[i].replace(root, newr) + '"')
print("# " + files[i] + " renamed to: " + files[i].replace(root, newr))
return
[docs]def repl_fline_val(f, iline, oldval, newval):
"""Replace `oldval` by `newval` in `f[iline]`
return `f` replaced.
"""
f[iline] = f[iline].replace(str(oldval), str(newval))
return f
[docs]def output_rename(fullpath, prefix):
"""Replace FILENAME prefix to the one set here.
The routine is based on the '_' character.
Example:
folder/SED_mod1a.sed2 > folder/bin_mod1a.sed2
"""
path, fname = _os.path.split(fullpath)
return _os.path.join(path, prefix + fname[fname.find("_") + 1 :])
# Python-version stuff
def user_input(arg):
if _sys.version_info[0] > 2:
return input(arg)
else:
return raw_input(arg)
# Plot-related
[docs]def add_subplot_axes(ax, rect, axisbg="w"):
"""Generate a subplot in the current axis. **Warning**: the labels with
be proportionally smaller.
Originally in http://stackoverflow.com/questions/17458580/
:Example:
fig = plt.figure()
ax = fig.add_subplot(111)]
rect = [0.2,0.2,0.7,0.7]
ax1 = add_subplot_axes(ax,rect)
:Alternative solution:
# Here the labels have the same size
from mpl_toolkits.axes_grid.inset_locator import inset_axes
inset_axes = inset_axes(parent_axes,
width="30%", # width = 30% of parent_bbox
height=1., # height : 1 inch
loc=3)
# "bbox_to_anchor" gives VERY strange results...
"""
fig = _plt.gcf()
box = ax.get_position()
width = box.width
height = box.height
inax_position = ax.transAxes.transform(rect[0:2])
transFigure = fig.transFigure.inverted()
infig_position = transFigure.transform(inax_position)
x = infig_position[0]
y = infig_position[1]
width *= rect[2]
height *= rect[3] # <= Typo was here
subax = fig.add_axes([x, y, width, height], axisbg=axisbg)
x_labelsize = subax.get_xticklabels()[0].get_size()
y_labelsize = subax.get_yticklabels()[0].get_size()
x_labelsize *= rect[2] ** 0.5
y_labelsize *= rect[3] ** 0.5
subax.xaxis.set_tick_params(labelsize=x_labelsize)
subax.yaxis.set_tick_params(labelsize=y_labelsize)
return subax
[docs]def mag_plot(ax, ylabel=r"mag$_V$", civcfg=[1, "y"], civdt=[1996, 1, 1], civlabel="%Y"):
"""TODO"""
# ax.legend()
# ax.locator_params(axis='x', nbins=6)
# fig.subplots_adjust(left=0.125, right=0.9, bottom=0.1, wspace=0.2)
ax2 = civil_ticks(ax, civcfg=civcfg, civdt=civdt, label=civlabel)
# minorLocator = AutoMinorLocator(2)
# ax.xaxis.set_minor_locator(minorLocator)
ax = enable_minorticks(ax, auto=4, axis="b")
ax2 = enable_minorticks(ax2, auto=4, axis="x")
ax.invert_yaxis()
ax.set_title("Epoch", y=1.05)
ax.set_xlabel("MJD")
ax.set_ylabel(ylabel)
return ax, ax2
[docs]def enable_minorticks(ax, auto=None, axis="x"):
"""Enable minor ticks
:param auto: argument to AutoMinorLocator, to specify a fixed number of
minor intervals per major interval.
:param axis: ['x', 'y', 'both']
"""
minorLocator = _AutoMinorLocator(auto)
if axis[0].lower() in ["x", "b"]:
ax.xaxis.set_minor_locator(minorLocator)
minorLocator = _AutoMinorLocator(auto)
if axis[0].lower() in ["y", "b"]:
ax.yaxis.set_minor_locator(minorLocator)
return ax
[docs]def civil_ticks(
ax, civcfg=[1, "m"], civdt=None, tklab=True, label="%y %b %d", **kwargs
):
"""Add the civil ticks in the axis.
:param civcfg: forces a given timestep between the ticks [`n`, interval].
Interval can be day (`d`), month (`m`) or year (`y`).
:param civdt: sets the initial tick date. Format: [Y, M, D]
:param tklab: if False, the civil date labels are not written.
:param label: define the label for the ``date.strftime`` to be displayed in
the tick labels.
:EXAMPLE:
fig, ax = plt.subplots()
ax.plot(x, y)
ax = phc.civil_ticks(ax)
"""
if civdt is not None:
civdt = _dt.datetime(civdt[0], civdt[1], civdt[2]).date()
mjd0, mjd1 = ax.get_xlim()
dtticks = gentkdates(mjd0, mjd1, civcfg[0], civcfg[1], dtstart=civdt)
mjdticks = [_jdcal.gcal2jd(date.year, date.month, date.day)[1] for date in dtticks]
ax2 = ax.twiny()
ax2.set_xlim(ax.get_xlim())
ax2.set_xticks(mjdticks)
if tklab:
ax2.set_xticklabels([date.strftime(label) for date in dtticks], **kwargs)
else:
ax2.set_xticklabels([])
return ax2
[docs]def savefig(fig, figname=None, fmt=["png"], keeppt=False, dpi=150, transp=True):
"""Standard way of saving a figure in PyHdust."""
if figname is None or figname == "":
figname = dtflag(ms=True)
elif not keeppt:
figname = figname.replace(".", "p")
if _os.path.basename(figname) == figname:
figname = _os.getcwd() + "/" + figname
for f in fmt:
fig.savefig(
figname + ".{0}".format(f), transparent=transp, bbox_inches="tight", dpi=dpi
)
print("# Saved {1}.{0}".format(f, figname))
_plt.close(fig)
return
[docs]def normGScale(val, vmin=None, vmax=None, log=False):
"""Return the normalized values of a given array to 0 and 255 (gray
scale).
If `log` then the normalization is done in this scale.
If `min` and `max` are not set, it is assumed that the values are from the
list.
.. code::
>>> phc.normGScale(np.linspace(0,10,10), 0, 10)
array([ 0, 28, 57, 85, 113, 142, 170, 198, 227, 255])
>>> phc.normGScale(np.linspace(0,10,10))
array([ 0, 28, 57, 85, 113, 142, 170, 198, 227, 255])
>>> phc.normGScale(np.linspace(0,10,10), 0, 10, log=True)
array([ 0, 0, 80, 128, 161, 187, 208, 226, 241, 255])
>>> phc.normGScale(np.logspace(0,1,10), 0, 10, log=True)
array([ 0, 28, 57, 85, 113, 142, 170, 198, 227, 255])
>>> phc.normGScale(np.logspace(0,1,10), 0, 10)
array([ 26, 33, 43, 55, 71, 92, 118, 153, 197, 255])
>>> phc.normGScale(np.logspace(0,1,10))
array([ 0, 8, 19, 33, 51, 73, 103, 142, 191, 255])
"""
if len(val) == 1 and (vmin is None or vmax is None):
_warn.warn("Wrong normGScale call!!")
return 127
#
val = _np.array(val).astype(float)
if vmin is None:
vmin = _np.min(val)
if vmax is None:
vmax = _np.max(val)
#
if not log:
val = (val - vmin) / (vmax - vmin) * 255
else:
if vmin <= 0:
vmin = _np.min(val[_np.where(val > 0)])
val[_np.where(val <= 0)] = vmin
val = (
(_np.log(_np.array(val).astype(float)) - _np.log(vmin))
/ (_np.log(vmax) - _np.log(vmin))
* 255
)
return _np.round(val).astype(int)
[docs]def gradColor(val, cmapn="jet", vmin=None, vmax=None, log=False):
"""Return the corresponding value(s) color of a given colormap.
Good options, specially for lines, are 'jet', 'gnuplot', 'brg',
'cool' and 'gist_heat' (attention! Here vmax is white!).
.. code-block:: python
cor = phc.gradColor(arange(10), cmapn='gist_heat')
for i in range(0,10):
cor = phc.gradColor(arange(10), cmapn='gist_heat')[i]
print cor
plt.plot(arange(5)+i, color=cor, label='GB='+('{0:4.2f},'*3).
format(*cor)[:-1])
plt.legend(fontsize=8)
.. image:: _static/phc_gradColor.png
:width: 512px
:align: center
:alt: phc.gradColor example
"""
val = normGScale(val, vmin=vmin, vmax=vmax, log=log)
cmap = _plt.get_cmap(cmapn)
return cmap(val)
[docs]def cycles(i=0, ctype="cor"):
"""Cycle between values of the phc.colors, phc.line_styles and
phc.filled_markers lists.
INPUT: valid ctypes are: ['cor','ls','mk']
OUTPUT: the corresponding value of the list."""
if ctype not in ["cor", "ls", "mk"]:
_warn.warn("Invalid ctype calling phc.cycles!")
return
elif ctype == "cor":
return colors[_np.mod(i, len(colors))]
elif ctype == "ls":
return line_styles[_np.mod(i, len(line_styles))]
elif ctype == "mk":
return filled_markers[_np.mod(i, len(filled_markers))]
[docs]def dashes(i=0):
"""Dashes scheme for plot"""
i = int(_np.mod(i, 7))
if i == 0:
return []
if i < 4:
return (32 / 2**i, 4, 4, 4)
if i < 7:
return (32 / 2 ** (i - 2), 4, 2, 4)
colors = [
"Black",
"Blue",
"Green",
"red",
"orange",
"brown",
"purple",
"gray",
"dodgerblue",
"lightgreen",
"tomato",
"yellow",
"peru",
"MediumVioletRed",
"LightSteelBlue",
"cyan",
"darkred",
"olive",
]
filled_markers = ["o", "v", "^", "<", ">", "8", "s", "p", "*", "h", "H", "D", "d"]
line_styles = ["-", "--", "-.", ":"]
# Physical functions
[docs]def n_air(lbd=0.55, T=300.15, P=1013.25, wp=6.113):
"""Air refraction index.
:param lbd: wavelength in microms
:param T: temperature in K
:param P: atmospheric pressure in mbar
:param wp: water vapor pressure in mbar
For all practical purposes, the refractive index is affected only by air
temperature.
"""
return 1 + 77.6e-6 / T * (1 + 7.52e-3 * lbd**-2) * (P + 4810 * wp / T)
[docs]def BBlbd(T, lbd=None):
"""Black body radiation as function of lambda. CGS units (erg s-1 sr−1
cm−3).
INPUT: lambda vector in cm. If None, lbd = _np.arange(1000, 10000, 100) *
1e-8 #Angs -> cm"""
if lbd is None:
lbd = _np.arange(1000, 10000.0, 100) * 1e-8 # Angs -> cm
ft = h.cgs * c.cgs / (lbd * kB.cgs * T)
return 2 * h.cgs * c.cgs**2 / (lbd**5 * (_np.exp(ft) - 1))
[docs]def fBBcor(T):
"""Correction as appendix B Vieira+2015. Stellar atmospheric models
systematically have a LOWER flux than a BB of a given Teff temperature in
IR (at least for Early-type stars).
fBBcor(T)*BBlbd(Teff) == Kurucz(Teff)
INPUT: Teff (Kurucz). log g = 4.0
OUTPUT: fBBcor(T) < 1.0"""
return 1.015 - 0.301 * (T / 1e4) + 0.064 * (T / 1e4) ** 2.0
[docs]def gbf(T, lbd):
r"""Gaunt factors from Vieira+2015.
INPUT: T (K) and lbd (:math:`\mu`m, array)
log(T /K) G0 G1 G2 B0 B1 B2
"""
vals = _np.array(
[
3.70,
0.0952,
0.0215,
0.0145,
2.2125,
-1.5290,
0.0563,
3.82,
0.1001,
0.0421,
0.0130,
1.6304,
-1.3884,
0.0413,
3.94,
0.1097,
0.0639,
0.0111,
1.1316,
-1.2866,
0.0305,
4.06,
0.1250,
0.0858,
0.0090,
0.6927,
-1.2128,
0.0226,
4.18,
0.1470,
0.1071,
0.0068,
0.2964,
-1.1585,
0.0169,
4.30,
0.1761,
0.1269,
0.0046,
-0.0690,
-1.1185,
0.0126,
]
).reshape((6, -1))
if T < 5000 or T > 22500:
_warn.warn("# ERROR! Invalid temperature for Gaunt factors " "calculation!")
return _np.zeros(len(lbd)), _np.zeros(len(lbd))
elif T >= 5000 and T < 10 ** vals[0, 0]:
_warn.warn("Extrapolated Gaunt factors!!")
g0, g1, g2, b0, b1, b2 = vals[0, 1:]
elif T <= 22500 and T > 10 ** vals[-1, 0]:
_warn.warn("Extrapolated Gaunt factors!!")
g0, g1, g2, b0, b1, b2 = vals[-1, 1:]
else:
i = _np.where(
vals[:, 0] == find_nearest(vals[:, 0], _np.log10(T), bigger=False)
)[0]
# print i, vals[i,0]
g0 = interLinND(
[_np.log10(T)], [vals[i, 0]], [vals[i + 1, 0]], vals[i : i + 2, 1]
)
g1 = interLinND(
[_np.log10(T)], [vals[i, 0]], [vals[i + 1, 0]], vals[i : i + 2, 2]
)
g2 = interLinND(
[_np.log10(T)], [vals[i, 0]], [vals[i + 1, 0]], vals[i : i + 2, 3]
)
b0 = interLinND(
[_np.log10(T)],
[vals[i, 0]],
[vals[i + 1, 0]],
vals[i : i + 2, 4],
disablelog=True,
)
b1 = interLinND(
[_np.log10(T)],
[vals[i, 0]],
[vals[i + 1, 0]],
vals[i : i + 2, 5],
disablelog=True,
)
b2 = interLinND(
[_np.log10(T)], [vals[i, 0]], [vals[i + 1, 0]], vals[i : i + 2, 6]
)
return _np.exp(g0 + g1 * _np.log(lbd) + g2 * _np.log(lbd) ** 2), _np.exp(
b0 + b1 * _np.log(lbd) + b2 * _np.log(lbd) ** 2
)
[docs]def lawkep(M=None, m=None, P=None, a=None):
"""Kepler law calc. Kepp `None` on what you what to calc.
Units are in *Solar System* one, what is, masses in Msun and `P` in years.
`a` is the distance between the two bodies, measured in AU.
One can also use the Centre-of-Mass equation a1*M1 = a2*M2 to relate the
two masses.
"""
Gc = G.cgs
pi2_4 = 4 * _np.pi**2
if sum([v is None for v in (M, m, P, a)]):
print("# Wrong call of phc.lawkep! Only one `None` argument is allowed.")
return None
elif M is None:
m *= Msun.cgs
P *= yr.cgs
a *= au.cgs
return (pi2_4 * a**3 / P**2 / Gc - m) / Msun.cgs
elif m is None:
M *= Msun.cgs
P *= yr.cgs
a *= au.cgs
return (pi2_4 * a**3 / P**2 / Gc - M) / Msun.cgs
elif P is None:
M *= Msun.cgs
m *= Msun.cgs
a *= au.cgs
return _np.sqrt(pi2_4 * a**3 / (M + m) / Gc) / yr.cgs
elif a is None:
M *= Msun.cgs
m *= Msun.cgs
P *= yr.cgs
return (P**2 * Gc * (M + m) / pi2_4) ** (1.0 / 3) / au.cgs
else:
print("# Wrong call of phc.lawkep! Keep `None` to calc that qtt.")
return None
[docs]class Constant(object):
"""Class for a physical/astronomical constant"""
def __init__(self, cgs, SI, unitscgs="", info="No available description"):
self.cgs = cgs
self.SI = SI
self.unitscgs = unitscgs
self.info = info
def __repr__(self):
return str("{0:.7e} in {1} (cgs)".format(self.cgs, self.unitscgs))
# From CODATA/NIST in May/2016 (related to Mohr+2015)
# http://arxiv.org/abs/1507.07956
nA = Constant(6.022140857e23, 6.022140857e23, "", "Avogadro's constant")
amu = Constant(1.66053904e-24, 1.66053904e-27, "g", "Unified atomic mass unit")
me = Constant(9.10938356e-28, 9.10938356e-31, "g", "Mass of electron")
mp = Constant(1.672621898e-24, 1.672621898e-27, "g", "Mass of proton")
mn = Constant(1.00137841898 * mp.cgs, 1.00137841898 * mp.SI, "g", "Mass of neutron")
h = Constant(6.62607004e-27, 6.62607004e-34, "erg s", "Planck constant")
eV = Constant(1.6021766208e-12, 1.6021766208e-19, "erg", "Electron volt")
kB = Constant(1.38064852e-16, 1.38064852e-23, "erg K-1", "Boltzmann constant")
alpha = Constant(7.2973525664e-3, 7.2973525664e-3, "", "Fine structure constant")
Rinf = Constant(109737.31568, 10973731.568, "cm-1", "Rydberg constant")
sigT = Constant(6.6524587158e-25, 6.6524587158e-29, "cm2", "Thomson cross section")
# From Luzum et al., 2011
au = Constant(1.49597870700e13, 1.49597870700e11, "cm", "Astronomical unit")
# From Prsa & Harmanec, 2012
G = Constant(6.67384e-8, 6.67384e-11, "cm3 g-1 s-2", "Gravitational constant")
Mea = Constant(398600.4418e15 / G.cgs, 398600.4418e9 / G.SI, "g", "Earth mass")
Rea = Constant(6371.0e5, 6371.0e3, "cm", "Earth radius")
Mju = Constant(126686535e15 / G.cgs, 126686535e9 / G.SI, "g", "Jupiter mass")
Rju = Constant(71492.0e5, 71492.0e3, "cm", "Jupiter radius")
c = Constant(2.99792458e10, 299792458.0, "cm s-1", "speed of light in vacuum")
sigma = Constant(
5.670400e-5, 5.670400e-8, "erg cm-2 K-4 s-1", "Stefan-Boltzmann constant"
)
Msun = Constant(1.988547e33, 1.988547e30, "g", "Solar mass")
Rsun = Constant(6.95508e10, 695508e3, "cm", "Solar radius")
Lsun = Constant(3.846e33, 3.846e26, "erg s-1", "Solar luminosity")
Tsun = Constant(5779.57, 5779.57, "K", "Solar Temperature")
# Derived quantities
# In astronomy, the Julian year is defined as 365.25 days of exactly 86400 SI
# seconds each, totalling exactly 31557600 SI (IAU style manual, 1989,
# Wilkins)
yr = Constant(60 * 60 * 24 * 365.25, 60 * 60 * 24 * 365.25, "sec", "year")
# For the Gregorian calendar the average length of the calendar year (the mean
# year) across the complete leap cycle of 400 years is 365.2425 days!
gyr = Constant(60 * 60 * 24 * 365.2425, 60 * 60 * 24 * 365.2425, "sec", "year")
ly = Constant(yr.cgs * c.cgs, yr.SI * c.SI, "cm", "Light year")
pc = Constant(
au.cgs * 60 * 60 * 180 / _np.pi, au.SI * 60 * 60 * 180 / _np.pi, "cm", "Parsec"
)
e = Constant(
10 * c.cgs * 1.6021766208e-19, 1.6021766208e-19, "esu", "Elementary charge"
)
a = Constant(
4 * sigma.cgs / c.cgs,
4 * sigma.SI / c.SI,
"erg cm-3 K-4",
"Radiation density constant",
)
hbar = Constant(
h.cgs / 2 / _np.pi, h.SI / 2 / _np.pi, "erg s", "Planck constant/(2*pi)"
)
# From Wikipedia
ep0 = Constant(1.0, 8.854187187e-12, "", "Permittivity of Free Space")
mH = Constant(1.00794 * amu.cgs, 1.00794 * amu.SI, "g", "Mass of hydrogen")
# MAIN ###
if __name__ == "__main__":
pass