Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geometry update by motor positions of quadrants #269

Merged
merged 8 commits into from
Apr 24, 2024
Merged
16 changes: 13 additions & 3 deletions extra_geom/crystfel_fmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
;
; See: http://www.desy.de/~twhite/crystfel/manual-crystfel_geometry.html

{motors}
{paths}
{frame_dim}
res = {resolution} ; pixels per metre
Expand Down Expand Up @@ -73,12 +74,13 @@ def frag_to_crystfel(fragment, p, a, ss_slice, fs_slice, dims, pixel_size):
min_fs=fs_slice.start,
max_fs=fs_slice.stop - 1,
ss_vec=_crystfel_format_vec(fragment.ss_vec / pixel_size),
fs_vec=_crystfel_format_vec(fragment.fs_vec/ pixel_size),
fs_vec=_crystfel_format_vec(fragment.fs_vec / pixel_size),
corner_x=c[0],
corner_y=c[1],
coffset=fragment.corner_pos[2],
)


def write_crystfel_geom(
self, filename, *,
data_path='/entry_1/instrument_1/detector_1/data',
Expand Down Expand Up @@ -106,6 +108,11 @@ def write_crystfel_geom(
else:
photon_energy_str = 'photon_energy = {}'.format(photon_energy)

if hasattr(self, "motors_to_geom") and callable(self.motors_to_geom):
motors = self.motors_to_geom() + "\n"
else:
motors = ""

# Get the frame dimension
tile_dims = {}

Expand Down Expand Up @@ -151,7 +158,8 @@ def write_crystfel_geom(
resolution=resolution,
adu_per_ev=adu_per_ev_str,
clen=clen_str,
photon_energy=photon_energy_str
photon_energy=photon_energy_str,
motors=motors,
))
f.write(format_bad_regions(
bad_regions,
Expand All @@ -163,12 +171,13 @@ def write_crystfel_geom(
for chunk in panel_chunks:
f.write(chunk)


def format_bad_regions(bad_regions: dict, mod_ss_pixels: int, layout_2d=False):
lines = []
for name, d in bad_regions.items():
if d['is_fsss']:
if layout_2d:
modno = int(re.match("p(\d+)a\d+", d['panel'])[1])
modno = int(re.match(r"p(\d+)a\d+", d['panel'])[1])
mod_offset = modno * mod_ss_pixels
min_ss = d['min_ss'] + mod_offset
max_ss = d['max_ss'] + mod_offset
Expand All @@ -192,6 +201,7 @@ def format_bad_regions(bad_regions: dict, mod_ss_pixels: int, layout_2d=False):
]
return "\n".join(lines)


def get_rigid_groups(geom, nquads=4):
"""Create string for rigid groups definition."""

Expand Down
22 changes: 19 additions & 3 deletions extra_geom/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from .base import DetectorGeometryBase, GeometryFragment
from .snapped import isinstance_no_import
from .motors import MotorMixin


class GenericGeometry(DetectorGeometryBase):
"""A generic detector layout based either on the CrystFEL geom file or on a set of parameters.
Expand Down Expand Up @@ -191,7 +193,7 @@ def from_crystfel_geom(cls, filename):
raise NotImplementedError


class AGIPD_1MGeometry(DetectorGeometryBase):
class AGIPD_1MGeometry(MotorMixin, DetectorGeometryBase):
"""Detector layout for AGIPD-1M

The coordinates used in this class are 3D (x, y, z), and represent metres.
Expand All @@ -209,6 +211,20 @@ class AGIPD_1MGeometry(DetectorGeometryBase):
n_tiles_per_module = 8
_pyfai_cls_name = 'AGIPD1M'

# motors related properties
n_movable_groups = 4
n_motor_per_group = 2
movable_groups = [
# Q1, Q2, Q3, Q4
np.s_[0:4], np.s_[4:8], np.s_[8:12], np.s_[12:16],
]
motor_axes = np.array([
[[-1, 0], [0, -1]], # Q1
[[-1, 0], [0, +1]], # Q2
[[+1, 0], [0, +1]], # Q3
[[+1, 0], [0, -1]], # Q4
])

@classmethod
def from_quad_positions(cls, quad_pos, asic_gap=2, panel_gap=29,
unit=pixel_size):
Expand Down Expand Up @@ -485,8 +501,8 @@ def from_origin(cls, origin=(0, 0), asic_gap=None, panel_gap=None,
"""
origin = np.asarray(origin) * unit
px_conversion = unit / cls.pixel_size
asic_gap = 2 if (asic_gap is None) else asic_gap * px_conversion
panel_gap = (16, 30) if (panel_gap is None) else np.asarray(panel_gap) * px_conversion
asic_gap = 2 if (asic_gap is None) else asic_gap * px_conversion
panel_gap = (16, 30) if (panel_gap is None) else np.asarray(panel_gap) * px_conversion

panel_gap_x = panel_gap[0] * cls.pixel_size
panel_gap_y = panel_gap[1] * cls.pixel_size
Expand Down
266 changes: 266 additions & 0 deletions extra_geom/motors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
import io

import numpy as np


def read_motors_from_data(dc, n_groups, n_motors, position_key, atol=0.001):
""" Read motors from experiment data.

The function reads motor position of ``n_motors`` motors for
``n_groups`` movable groups (e.g. quadrants) from experimental
data in EXDF format.

::
# open run
run = open_run(propno, runno)

# create function returning (source, key)
# to read motor `m` for group `q`
position_key = lambda q, m: (
f"SPB_IRU_AGIPD1M/MOTOR/Q{q+1}M{m+1}",
"actualPosition"
)

# read motor positions
motors = read_motors_from_data(run, 4, 2, position_key)

# add aliases
run2 = run.with_aliases({
f"motor_q{q+1}m{m+1}": (
f"SPB_IRU_AGIPD1M/MOTOR/Q{q+1}M{m+1}",
"actualPosition"
)
for q, m in product(range(4), range(2))
})

# read motor positions using aliases
motors = read_motors_from_data(
run2.alias, 4, 2, lambda q, m: f"motor_q{q+1}m{m+1}")

Parameters
----------
dc: extra_data.DataCollection or extra_data.AliasIndexer
Experimental data
n_groups: int
The number of movable groups
n_motors: int
The number of motors per group
position_key: callable
A function `position_key(q, m)` returning an indentificator of
motor position property for motor `m` in group `q`. If the `dc`
is DataCollection, then the identificator is a tuple of source
and key. If the `dc` is AliasIndexer, then - alias

Returns
-------
numpy.ndarray:
an array with shape (n_groups, n_motors)
"""
positions = [

Check warning on line 59 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L59

Added line #L59 was not covered by tests
[
dc[position_key(q, m)].as_single_value(atol=atol)
for m in range(n_motors)
] for q in range(n_groups)
]
return np.array(positions)

Check warning on line 65 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L65

Added line #L65 was not covered by tests


def motors_to_geom(positions):
"""Prints the motor positions in the text format."""
n_groups, n_motors = positions.shape
meta_lines = [f";XGEOM MOTORS={n_groups},{n_motors}"]
meta_lines += [
f";XGEOM MOTOR_Q{q+1}=" + ",".join(
(str(positions[q, m]) for m in range(n_motors))
) for q in range(n_groups)
]
return "\n".join(meta_lines) + "\n"


def read_motors_from_geom(text):
"""Reads the motor positions from the text format."""
if isinstance(text, str):
file = io.StringIO(text)
else:
file = text

meta = {
line[0]: line[2]
for line in (
line[7:].partition('=')
for line in file if line.startswith(";XGEOM ")
)
}
try:
n_groups, n_motors = (int(n) for n in meta["MOTORS"].split(','))
except KeyError:
raise ValueError("MOTORS record is not found")
except ValueError:
raise ValueError(

Check warning on line 99 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L98-L99

Added lines #L98 - L99 were not covered by tests
"Invalid MOTORS format, expected two comma-separated integers")

positions = []
for q in range(n_groups):
try:
key = f"MOTOR_Q{q+1}"
pos = [float(n) for n in meta[key].split(',')]
except KeyError:
raise ValueError(key + " record is not found")
except ValueError:
raise ValueError(

Check warning on line 110 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L107-L110

Added lines #L107 - L110 were not covered by tests
f"Invalid {key} format, expected {n_motors} "
"comma-separated floats")
if len(pos) != n_motors:
raise ValueError(

Check warning on line 114 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L114

Added line #L114 was not covered by tests
f"Wrong length of {key}, expected {n_motors} floats")

positions.append(pos)

return np.array(positions)


class MotorMixin:
n_movable_groups = 4
n_motor_per_group = 2
motor_position_shape = (4, 2)
motor_axis_shape = (4, 2, 2)

# groups of modules driven by motors together
# Q1, Q2, Q3, Q4
movable_groups = [
np.s_[0:4], np.s_[4:8], np.s_[8:12], np.s_[12:16],
]

# transformation matrix (h,v) -> (x,y), where
# (h, v) - local motor coordinates
# (x, y) - laboratory cooridnates (looking downstream)
# | hx vx | |h|
# | hy vy | |v|
motor_axes = np.array([
[[-1, 0], [0, -1]], # Q1
[[-1, 0], [0, +1]], # Q2
[[+1, 0], [0, +1]], # Q3
[[+1, 0], [0, -1]], # Q4
])

# motor positions in local motor coordinates (h, v)
# for each movable group of modules
# [[Q1M1, Q1M2], ..., [Q4M1, Q4M2]]
# motor_positions = np.array([
# [0, 0], [0, 0], [0, 0], [0, 0],
# ])

def __init__(self, modules, filename='No file', metadata=None):
super().__init__(modules, filename, metadata)

self.motor_axes_shape = (
self.n_movable_groups,
2,
self.n_motor_per_group
)
self.motor_position_shape = (
self.n_movable_groups,
self.n_motor_per_group
)
try:
with open(filename) as f:
self.motor_positions = read_motors_from_geom(f)
except (ValueError, FileNotFoundError):
Fixed Show fixed Hide fixed
pass

def set_motor_positions(self, new_motor_positions):
"""Set motor positions for the geometry.

Parameters
----------
new_motor_positions: array or list
New motor positions as array of the number of movable groups
(quadrants) by the number of motor per group.
"""
new_motor_positions = np.array(new_motor_positions, copy=True)
if new_motor_positions.shape != self.motor_position_shape:
raise ValueError(f"Expects array{self.motor_position_shape}: "

Check warning on line 182 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L182

Added line #L182 was not covered by tests
f"{self.n_movable_groups} groups moving by "
f"{self.n_motor_per_group} motor each.")
self.motor_positions = new_motor_positions

def set_motor_axes(self, new_motor_axes):
egorsobolev marked this conversation as resolved.
Show resolved Hide resolved
"""Set the matrices of transformation motor positions in
the positions of detector panels.

::
(h, v) - local motor coordinates
(x, y) - laboratory cooridnates (looking downstream)
(hx, hy) - the axis of horizontal motor in laboratory coordinates
(vx, vy) - the axis of vertical motor in laboratory coordinates

|x| _ | hx vx | |h|
|y| ‾ | hy vy | |v|

Parameters
----------
new_motor_axes: array or list
New matrices of motor axes (transmation matrices). The matrices
are expected as three dimention array of the number of movable
groups (quadrants) by the number of panel coordinates (two) by
the number of motors per group.
"""
new_motor_axes = np.array(new_motor_axes, copy=True)
if new_motor_axes.shape != self.motor_axes_shape:
raise ValueError(f"Expects array{self.motor_axes_shape}: "

Check warning on line 210 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L208-L210

Added lines #L208 - L210 were not covered by tests
f"{self.n_movable_groups} groups moving by "
f"{self.n_motor_per_group} motor each.")
self.motor_axes = new_motor_axes

Check warning on line 213 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L213

Added line #L213 was not covered by tests

def move_by_motors(self, new_motor_positions):
"""Move the geometry according to the given motor positions.

This changes the geometry according to the given motor positions
with respect the current motor position. If the geometry does not
have current motor positions, then this assumes that all motors
are in zero positions.

Parameters
----------
new_motor_positions: array or list
New motor positions as array of the number of movable groups
(quadrants) by the number of motor per group.

Returns
-------
geometry: the same class as self
a new geometry
"""
new_motor_positions = np.array(new_motor_positions, copy=True)
if new_motor_positions.shape != self.motor_position_shape:
raise ValueError(f"Expects array{self.motor_position_shape}: "

Check warning on line 236 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L236

Added line #L236 was not covered by tests
f"{self.n_movable_groups} groups moving by "
f"{self.n_motor_per_group} motor each.")

new_geom = self.offset((0, 0))
if hasattr(self, "motor_positions"):
motor_diff = (new_motor_positions - self.motor_positions) * 1e-3

Check warning on line 242 in extra_geom/motors.py

View check run for this annotation

Codecov / codecov/patch

extra_geom/motors.py#L242

Added line #L242 was not covered by tests
else:
motor_diff = new_motor_positions * 1e-3

for i in range(self.n_movable_groups):
det_diff = self.motor_axes[i] @ motor_diff[i]
new_geom = new_geom.offset(
det_diff, modules=self.movable_groups[i])

new_geom.motor_positions = new_motor_positions
return new_geom

def motors_to_geom(self):
"""Format the current motor position as text ready to store in Crystfel
geometry file.

Returns
-------
str:
text with motor positions.
"""
if hasattr(self, "motor_positions"):
return motors_to_geom(self.motor_positions)
else:
return ""
Loading
Loading