Source code for simba.Codes.Xsuite.Xsuite

"""
SIMBA Xsuite Module

Various objects and functions to handle Xsuite lattices and commands. See `Xsuite github`_ for more details.

    .. _Xsuite github: https://github.com/xsuite

Classes:
    - :class:`~simba.Codes.Xsuite.Xsuite.xsuiteLattice`: The Xsuite lattice object, used for
    converting the :class:`~simba.Framework_objects.frameworkObject` s defined in the
    :class:`~simba.Framework_objects.frameworkLattice` into an Xsuite lattice object,
    and for tracking through it.

"""
try:
    import cupy as cp
    has_cupy = True
except ImportError:
    has_cupy = False

from ...Framework_objects import frameworkLattice, getGrids
from ...Modules import Beams as rbf
from copy import deepcopy
import numpy as np
import json

from typing import Dict, List, Any, Literal


[docs] class xsuiteLattice(frameworkLattice): """ Class for defining the Xsuite lattice object, used for converting the :class:`~simba.Framework_objects.frameworkObject`s defined in the :class:`~simba.Framework_objects.frameworkLattice` into an Xsuite lattice object, and for tracking through it. """ code: str = "xsuite" """String indicating the lattice object type""" trackBeam: bool = True """Flag to indicate whether to track the beam. If False, run in single-particle mode; the beam is not tracked and the output beam distributions will be generated from Gaussians.""" names: List = None """Names of elements in the lattice""" particle_definition: str = None """Initial particle distribution as a string""" final_screen: Any = None """Final screen object""" env: Any = None """Xsuite environment object""" line: Any = None """Xsuite line object""" pin: Any | None = None """Xsuite input particle distribution""" pout: Any | None = None """Xsuite output particle distribution""" tws: Any | None = None """Xsuite Twiss Table output""" matrices: Dict | None = None """Dictionary of R-matrices produced by tracking""" context: Any | None = None """Xsuite context object""" beam_data: Dict = {} """Data containing beam statistics at each element""" grids: getGrids = None """Class for calculating the required number of space charge grids""" pic_solver: Literal['FFTSolver2p5DAveraged'] = 'FFTSolver2p5DAveraged' """PIC solver to use for space charge calculations""" ref_s: float = None """Reference s position""" ref_idx: int = None """Reference particle index""" def model_post_init(self, __context): super().model_post_init(__context) import xobjects as xo if ( "input" in self.file_block and "particle_definition" in self.file_block["input"] ): if ( self.file_block["input"]["particle_definition"] == "initial_distribution" ): self.particle_definition = "laser" else: self.particle_definition = self.file_block["input"][ "particle_definition" ] else: self.particle_definition = self.start if has_cupy: self.context = xo.ContextCupy() else: self.context = xo.ContextCpu() self.grids = getGrids()
[docs] def writeElements(self) -> None: """ Create Xsuite objects for all the elements in the lattice and set the :attr:`~simba.Codes.Xsuite.Xsuite.xsuiteLattice.lat_obj` and :attr:`~simba.Codes.Xsuite.Xsuite.xsuiteLattice.names`. """ import xtrack as xt self.env = xt.Environment() particle_ref = xt.Particles( p0c=[self.global_parameters["beam"].centroids.mean_cp.val], mass0=[self.global_parameters["beam"].particle_rest_energy_eV.val], q0=-1, zeta=0.0, ) beam_length = len(self.global_parameters["beam"].x.val) self.line = self.section.to_xsuite(beam_length, env=self.env, particle_ref=particle_ref, save=True) self.names = self.line.element_names
[docs] def setup_collective_effects(self) -> None: import xfields as xf if "charge" in list(self.file_block.keys()): if ( "space_charge_mode" in list(self.file_block["charge"].keys()) and self.file_block["charge"]["space_charge_mode"].lower() == "3d" ): be = self.global_parameters["beam"] gridsize = self.grids.getGridSizes( (len(be.x) / self.sample_interval) ) sigma_z = be.sigmas.sigma_z.val lprofile = xf.LongitudinalProfileQGaussian( number_of_particles=len(be.x), sigma_z=sigma_z, ) xf.install_spacecharge_frozen( line=self.line, longitudinal_profile=lprofile, nemitt_x=be.emittance.normalized_horizontal_emittance.val, nemitt_y=be.emittance.normalized_horizontal_emittance.val, sigma_z=sigma_z, num_spacecharge_interactions=len(self.getNames()) ) pic_collection, all_pics = xf.replace_spacecharge_with_PIC( line=self.line, n_sigmas_range_pic_x=8, n_sigmas_range_pic_y=8, nx_grid=gridsize, ny_grid=gridsize, nz_grid=gridsize, n_lims_x=7, n_lims_y=3, z_range=(-3 * sigma_z, 3 * sigma_z), solver=self.pic_solver)
[docs] def write(self) -> None: """ Create the lattice object via :func:`~simba.Codes.Xsuite.Xsuite.xsuiteLattice.writeElements` and save it as a python file to `master_subdir`. """ self.writeElements()
[docs] def preProcess(self) -> None: """ Get the initial particle distribution defined in `file_block['input']['prefix']` if it exists. """ super().preProcess() prefix = self.get_prefix() prefix = prefix if self.trackBeam else prefix + self.particle_definition self.ref_s = self.global_parameters["beam"].s self.read_input_file(prefix, self.particle_definition) self.ref_idx = self.global_parameters["beam"].reference_particle_index self.hdf5_to_json(prefix)
[docs] def hdf5_to_json(self, prefix: str = "", write: bool = True) -> None: """ Convert the initial HDF5 particle distribution to Xsuite format and set :attr:`~simba.Codes.Xsuite.Xsuite.xsuiteLattice.pin` accordingly. Parameters ---------- prefix: str Prefix for particle file write: bool Flag to indicate whether to save the file """ xsuitebeamfilename = self.global_parameters["master_subdir"] + "/" + self.particle_definition + ".xsuite.json" self.pin = rbf.beam.write_xsuite_beam_file( self.global_parameters["beam"], xsuitebeamfilename, write=write, s_start=self.startObject.physical.start.z, )
[docs] def run(self) -> None: """ Run the code, and set :attr:`~tws` and :attr:`~pout` """ import xtrack as xt from xtrack import Cavity self.line.build_tracker(_context=self.context) self.line.freeze_longitudinal(state=False) self.line.freeze_energy(state=False, force=True) pin = deepcopy(self.pin) for el, name in zip(self.line.elements, self.line.element_names): if isinstance(el, Cavity): xt.ReferenceEnergyIncrease( Delta_p0c=el.voltage * np.sin(el.lag * np.pi / 180) ).track(pin) pin.zeta -= np.mean(pin.zeta) # Center zeta el.track(pin, increment_at_element=True) # Track in-place stats = { 'mean_x': np.mean(pin.x), 'mean_y': np.mean(pin.y), 'sigma_x': np.std(pin.x), 'sigma_px': np.std(pin.px), 'sigma_y': np.std(pin.y), 'sigma_py': np.std(pin.py), 'sigma_zeta': np.std(pin.zeta), 'sigma_delta': np.std(pin.delta) * np.mean(pin.energy), 'momentum': np.mean(pin.energy) - pin.mass0, 'emit_xn': np.mean(self.compute_norm_emit(pin.x, pin.px, pin)), 'emit_yn': np.mean(self.compute_norm_emit(pin.y, pin.py, pin)), } self.beam_data.update({name: stats}) self.beam_data.update({"_end_point": stats}) self.pout = pin self.tws = self.line.twiss( betx=self.global_parameters["beam"].twiss.beta_x.val, alfx=self.global_parameters["beam"].twiss.alpha_x.val, bety=self.global_parameters["beam"].twiss.beta_y.val, alfy=self.global_parameters["beam"].twiss.alpha_y.val, compute_R_element_by_element=False, method="6d", freeze_energy=False, )
[docs] def compute_norm_emit(self, coord, mom, particles): cov = np.cov(coord, mom) emit = np.sqrt(cov[0, 0] * cov[1, 1] - cov[0, 1] ** 2) gamma = particles.energy / particles.mass0 beta = particles.beta0 * (1 + particles.delta.mean()) / (1 + particles.delta.mean() * particles.beta0 ** 2) return gamma * beta * emit
[docs] def postProcess(self) -> None: """ Convert the outputs from Ocelot to HDF5 format and save them to `master_subdir`. """ import xobjects as xo super().postProcess() bfname = f'{self.global_parameters["master_subdir"]}/{self.end}.xsuite.json' with open(bfname, 'w') as fid: json.dump(self.pout.to_dict(), fid, cls=xo.JEncoder) beam = deepcopy(self.global_parameters["beam"]) svals = self.getSValues(as_dict=True) beam.read_xsuite_beam_file( bfname, zstart=self.endObject.physical.middle.z, s=svals[self.end], ref_index=self.ref_idx ) rbf.openpmd.write_openpmd_beam_file( beam, f'{self.global_parameters["master_subdir"]}/{self.end}.openpmd.hdf5', ) for elem in self.screens_and_bpms: fname = f'{self.global_parameters["master_subdir"]}/{elem.name}.xsuite.json' with open(fname, 'w') as fid: json.dump(self.line[elem.name].data.to_dict(), fid, cls=xo.JEncoder) beam = deepcopy(self.global_parameters["beam"]) beam.read_xsuite_beam_file( fname, zstart=elem.physical.middle.z, s=svals[elem.name], ref_index=self.ref_idx, ) rbf.openpmd.write_openpmd_beam_file( beam, f'{self.global_parameters["master_subdir"]}/{elem.name}.openpmd.hdf5', ) df = self.tws.to_pandas() if self.ref_s is None: self.ref_s = self.startObject.physical.start.z df["s"] += self.ref_s svals = np.array(self.getSValues(at_entrance=False)) + df["s"][0] zvals = [a[-1] for a in self.getZValues()] df["z"] = np.interp(df["s"], svals, zvals) for k in self.beam_data[list(self.beam_data.keys())[0]].keys(): df[k] = [x[k] for x in self.beam_data.values()] df.to_csv(f'{self.global_parameters["master_subdir"]}/{self.objectname}_twiss.csv')