Source code for simba.Codes.GPT.GPT

"""
Simframe GPT Module

Various objects and functions to handle GPT lattices and commands.

Classes:
    - :class:`~simba.Codes.GPT.GPT.gptLattice`: The GPT lattice object, used for
    converting the :class:`~simba.Framework_objects.frameworkObject` s defined in the
    :class:`~simba.Framework_objects.frameworkLattice` into a string representation of
    the lattice suitable for GPT input and lattice files.

    - :class:`~simba.Codes.GPT.GPT.gpt_element`: Base class for defining
    commands in a GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_setfile`: Class for defining the
    input files for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_charge`: Class for defining the
    bunch charge for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_setreduce`: Class for reducing the
    number of particles for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_accuracy`: Class for setting the
    accuracy for GPT tracking.

    - :class:`~simba.Codes.GPT.GPT.gpt_spacecharge`: Class for defining the
    space charge setup for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_tout`: Class for defining the
    number of steps for particle distribution output for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_csr1d`: Class for defining the
    CSR calculations for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_writefloorplan`: Class for setting up the
    writing of the lattice floor plan for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_Zminmax`: Class for defining the
    minimum and maximum z-positions for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_forwardscatter`: Class for defining
    scattering parameters for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_scatterplate`: Class for defining a
    scattering object for the GPT input file.

    - :class:`~simba.Codes.GPT.GPT.gpt_dtmaxt`: Class for defining the
    step size(s) for the GPT input file.
"""

import os
import subprocess
import numpy as np
from laura.models.diagnostic import DiagnosticElement

from ...Framework_objects import frameworkLattice
from ...FrameworkHelperFunctions import saveFile
from ...Modules import Beams as rbf
from ...Modules.constants import speed_of_light
from ...Modules.units import UnitValue
from ...Modules.gdf_beam import gdf_beam
from typing import Dict, Literal, Any
from laura.translator.converters.codes.gpt import (
    gpt_setfile,
    gpt_charge,
    gpt_setreduce,
    gpt_accuracy,
    gpt_spacecharge,
    gpt_tout,
    gpt_csr1d,
    gpt_writefloorplan,
    gpt_Zminmax,
    gpt_forwardscatter,
    gpt_scatterplate,
    gpt_dtmaxt,
)

gpt_defaults = {}


[docs] class gptLattice(frameworkLattice): """ Class for defining the GPT lattice object, used for converting the :class:`~simba.Framework_objects.frameworkObject`s defined in the :class:`~simba.Framework_objects.frameworkLattice` into a string representation of the lattice suitable for a GPT input file. """ code: str = "gpt" """String indicating the lattice object type""" allow_negative_drifts: bool = True """Flag to indicate whether negative drifts are allowed""" bunch_charge: float | None = None """Bunch charge""" headers: Dict = {} """Headers to be included in the GPT lattice file""" ignore_start_screen: Any = None """Flag to indicate whether to ignore the first screen in the lattice""" screen_step_size: float = 0.1 """Step size for screen output""" time_step_size: float = 1e-11 """Step size for output data during tracking""" override_meanBz: float | int | None = None """Set the average particle longitudinal velocity manually""" override_tout: float | int | None = None """Set the time step output manually""" accuracy: int = 6 """Tracking accuracy""" endScreenObject: Any = None """Final screen object for dumping particle distributions""" Brho: UnitValue | None = None """Magnetic rigidity""" particle_definition: str = None """Initial particle definition""" dtmin: float | None = None """Integration time step size""" def model_post_init(self, __context): super().model_post_init(__context) if ( "input" in self.file_block and "particle_definition" in self.file_block["input"] ): if ( self.file_block["input"]["particle_definition"] == "initial_distribution" ): self.particle_definition = "laser" else: self.particle_definition = self.file_block["input"][ "particle_definition" ] else: self.particle_definition = self.start self.headers["setfile"] = gpt_setfile( set='"beam"', filename='"' + self.name + '.gdf"' ) self.headers["floorplan"] = gpt_writefloorplan( filename='"' + self.objectname + '_floor.gdf"' ) @property def space_charge_mode(self) -> str | None: """ Get the space charge mode based on :attr:`~simba.Framework_objects.frameworkLattice.globalSettings` or :attr:`~simba.Framework_objects.frameworkLattice.file_block`. Returns ------- str Space charge mode as string, or None if not provided. """ if ( "charge" in self.file_block and "space_charge_mode" in self.file_block["charge"] ): return self.file_block["charge"]["space_charge_mode"] elif ( "charge" in self.globalSettings and "space_charge_mode" in self.globalSettings["charge"] ): return self.globalSettings["charge"]["space_charge_mode"] else: return None @space_charge_mode.setter def space_charge_mode(self, mode: Literal["2d", "3d", "2D", "3D"]) -> None: """ Set the space charge mode manually ["2D", "3D"]. Parameters ---------- mode: Literal["2d", "3d", "2D", "3D"] The space charge calculation mode """ if "charge" not in self.file_block: self.file_block["charge"] = {} self.file_block["charge"]["space_charge_mode"] = mode
[docs] def writeElements(self) -> str: """ Write the lattice elements defined in this object into a GPT-compatible format; see :attr:`~simba.Framework_objects.frameworkLattice.elementObjects`. The appropriate headers required for GPT are written at the top of the file, see the `write_GPT` function in :class:`~simba.Codes.GPT.gpt_element`. Returns ------- str The lattice represented as a string compatible with GPT """ self.headers["accuracy"] = gpt_accuracy(accuracy=self.accuracy) if "charge" not in self.file_block: self.file_block["charge"] = {} if "charge" not in self.globalSettings: self.globalSettings["charge"] = {} space_charge_dict = self.file_block["charge"] | self.globalSettings["charge"] space_charge = self.global_parameters | space_charge_dict self.headers["spacecharge"] = gpt_spacecharge(**space_charge) if self.particle_definition == "laser" and self.space_charge_mode is not None: self.headers["spacecharge"].npart = len(self.global_parameters["beam"].x) self.headers["spacecharge"].sample_interval = self.sample_interval # self.headers["spacecharge"].space_charge_mode = "cathode" if ( self.csr_enable and len(self.dipoles) > 0 and max([abs(d.angle) for d in self.dipoles]) > 0 ): # and not os.name == 'nt': self.headers["csr1d"] = gpt_csr1d() # print('CSR Enabled!', self.objectname, len(self.dipoles)) # self.headers['forwardscatter'] = gpt_forwardscatter(ECS='"wcs", "I"', name='cathode', probability=0) # self.headers['scatterplate'] = gpt_scatterplate(ECS='"wcs", "z", -1e-6', model='cathode', a=1, b=1) self.headers["setfile"].particle_definition = self.particle_definition self.section.gpt_headers = self.headers fulltext = self.section.to_gpt( startz=self.startObject.physical.start.z, endz=self.endObject.physical.end.z, Brho=self.global_parameters["beam"].Brho, dtmin=self.dtmin # screen_step_size=self.screen_step_size, ) return fulltext
[docs] def write(self) -> None: """ Writes the GPT input file from :func:`~simba.Codes.GPT.gptLattice.writeElements` to <master_subdir>/<self.objectname>.in. """ code_file = ( self.global_parameters["master_subdir"] + "/" + self.objectname + ".in" ) saveFile(code_file, self.writeElements()) self.files.append(code_file)
[docs] def preProcess(self) -> None: """ Convert the beam file from the previous lattice section into GPT format and set the number of particles based on the input distribution, see :func:`~simba.Codes.GPT.GPT.gptLattice.hdf5_to_astra`. """ super().preProcess() self.headers["setfile"].particle_definition = self.objectname + ".gdf" prefix = self.get_prefix() self.hdf5_to_gdf(prefix)
[docs] def run(self) -> None: """ Run the code with input 'filename' `GPTLICENSE` must be provided in :attr:`~simba.Framework_objects.frameworkLattice.global_parameters`. Average properties of the distribution are also calculated and written to an `<>emit.gdf` file in `master_subdir`. """ main_command = ( self.executables[self.code] + ["-o", self.objectname + "_out.gdf"] + ["GPTLICENSE=" + self.global_parameters["GPTLICENSE"]] + [self.objectname + ".in"] ) my_env = os.environ.copy() my_env["LD_LIBRARY_PATH"] = ( my_env["LD_LIBRARY_PATH"] + ":/opt/GPT3.3.6/lib/" if "LD_LIBRARY_PATH" in my_env else "/opt/GPT3.3.6/lib/" ) my_env["OMP_WAIT_POLICY"] = "PASSIVE" post_command = ( [self.executables[self.code][0].replace("gpt", "gdfa")] + ["-o", self.objectname + "_emit.gdf"] + [self.objectname + "_out.gdf"] + [ "position", "Q", "avgx", "avgy", "avgz", "stdx", "stdBx", "stdy", "stdBy", "stdz", "stdt", "nemixrms", "nemiyrms", "nemizrms", "numpar", "nemirrms", "avgG", "avgp", "stdG", "avgt", "avgBx", "avgBy", "avgBz", "CSalphax", "CSalphay", "CSbetax", "CSbetay", ] ) post_command_t = ( [self.executables[self.code][0].replace("gpt", "gdfa")] + ["-o", self.objectname + "_emitt.gdf"] + [self.objectname + "_out.gdf"] + [ "time", "Q", "avgx", "avgy", "avgz", "stdx", "stdBx", "stdy", "stdBy", "stdz", "nemixrms", "nemiyrms", "nemizrms", "numpar", "nemirrms", "avgG", "avgp", "stdG", "avgBx", "avgBy", "avgBz", "CSalphax", "CSalphay", "CSbetax", "CSbetay", "avgfBx", "avgfEx", "avgfBy", "avgfEy", "avgfBz", "avgfEz", ] ) post_command_traj = ( [self.executables[self.code][0].replace("gpt", "gdfa")] + ["-o", self.objectname + "traj.gdf"] + [self.objectname + "_out.gdf"] + ["time", "Q", "avgx", "avgy", "avgz"] ) with open( os.path.abspath( self.global_parameters["master_subdir"] + "/" + self.objectname + ".bat" ), "w", ) as batfile: for command in [ main_command, post_command, post_command_t, post_command_traj, ]: output = '"' + command[0] + '" ' for c in command[1:]: output += c + " " output += "\n" batfile.write(output) with open( os.path.abspath( self.global_parameters["master_subdir"] + "/" + self.objectname + ".log" ), "w", ) as f: # print('gpt command = ', command) subprocess.call( main_command, stdout=f, cwd=self.global_parameters["master_subdir"], env=my_env, ) subprocess.call( post_command, stdout=f, cwd=self.global_parameters["master_subdir"] ) subprocess.call( post_command_t, stdout=f, cwd=self.global_parameters["master_subdir"] ) subprocess.call( post_command_traj, stdout=f, cwd=self.global_parameters["master_subdir"] )
[docs] def postProcess(self) -> None: """ Convert the beam file(s) from the GPT output into HDF5 format, see :func:`~simba.Elements.screen.screen.gdf_to_hdf5`. """ super().postProcess() cathode = self.particle_definition == "laser" svals = np.array(self.getSValues(at_entrance=False)) + self.startObject.physical.start.z zvals = [a[-1] for a in self.getZValues()] gdfbeam = rbf.gdf.read_gdf_beam_file_object( f'{self.global_parameters["master_subdir"]}/{self.objectname}_out.gdf' ) for e in self.screens_and_markers_and_bpms: if not e.name == self.start: sval = np.interp(e.physical.middle.z, zvals, svals) self.gdf_to_hdf5( gptbeamfilename=self.objectname + "_out.gdf", screen=e, cathode=cathode, gdf=gdfbeam, t0=self.headers["setfile"].time, sval=sval, ) # else: # print('Ignoring', self.ignore_start_screen.objectname) sval = np.interp(self.endObject.physical.middle.z, zvals, svals) self.gdf_to_hdf5( gptbeamfilename=self.objectname + "_out.gdf", screen=self.endObject, cathode=cathode, gdf=gdfbeam, t0=self.headers["setfile"].time, sval=sval, )
[docs] def hdf5_to_gdf(self, prefix: str="") -> None: """ Convert the HDF5 beam distribution to GDF format. Certain properties of this class, including :attr:`~simba.Codes.GPT.GPT.gptLattice.sample_interval`, :attr:`~simba.Codes.GPT.GPT.gptLattice.override_meanBz`, :attr:`~simba.Codes.GPT.GPT.gptLattice.override_tout` are also used to update :attr:`~simba.Codes.GPT.GPT.gptLattice.headers`. Parameters ---------- prefix: str HDF5 file prefix """ self.read_input_file(prefix, self.particle_definition) if self.particle_definition == "laser": self.global_parameters["beam"].z = UnitValue(0 * self.global_parameters["beam"].t, units="m") self.headers["setfile"].time = np.mean(self.global_parameters["beam"].t) if self.sample_interval > 1: self.headers["setreduce"] = gpt_setreduce( set='"beam"', setreduce=int( len(self.global_parameters["beam"].x) / self.sample_interval ), ) if self.override_meanBz is not None and isinstance( self.override_meanBz, (int, float) ): meanBz = self.override_meanBz else: meanBz = np.mean(self.global_parameters["beam"].Bz) if meanBz < 0.5: meanBz = 0.75 if self.override_tout is not None and isinstance( self.override_tout, (int, float) ): self.headers["tout"] = gpt_tout( starttime=0, endpos=self.override_tout, step=str(self.time_step_size) ) else: endpos = ( self.findS(self.endObject.name)[0][1] - self.findS(self.startObject.name)[0][1] ) self.headers["tout"] = gpt_tout( starttime=0, endpos=endpos / meanBz / speed_of_light, step=str(self.time_step_size), ) self.global_parameters["beam"].beam.rematchXPlane( **self.initial_twiss["horizontal"] ) self.global_parameters["beam"].beam.rematchYPlane( **self.initial_twiss["vertical"] ) gdfbeamfilename = self.objectname + ".gdf" cathode = self.particle_definition == "laser" rbf.gdf.write_gdf_beam_file( self.global_parameters["beam"], self.global_parameters["master_subdir"] + "/" + gdfbeamfilename, normaliseX=self.startObject.physical.middle.x, cathode=cathode, ) self.Brho = self.global_parameters["beam"].Brho self.files.append(self.global_parameters["master_subdir"] + "/" + gdfbeamfilename)
[docs] def gdf_to_hdf5( self, screen: DiagnosticElement, gptbeamfilename: str, cathode: bool = False, gdf: gdf_beam | None = None, t0: float = 0.0, sval: float = 0.0, ) -> None: """ Convert the GDF beam file to HDF5 format and write the beam file. Parameters ---------- screen: laura.models.diagnostic.DiagnosticElement Diagnostic element gptbeamfilename: str Name of GPT beam file cathode: bool True if beam was emitted from a cathode gdf: gdfbeam or None GDF beam object t0: float Initial time co-ordinate sval: float S-position of screen """ # gptbeamfilename = self.objectname + '.' + str(int(round((self.allElementObjects[self.end].position_end[2])*100))).zfill(4) + '.' + str(master_run_no).zfill(3) # try: # print('Converting screen', self.objectname,'at', self.gpt_screen_position) beam = rbf.beam() rbf.gdf.read_gdf_beam_file( beam, os.path.join(self.global_parameters["master_subdir"], gptbeamfilename), position=screen.physical.middle.z, gdfbeam=gdf, ) self.beam.t += t0 self.beam.s = UnitValue(sval, units="m") HDF5filename = screen.name + ".openpmd.hdf5" rbf.openpmd.write_openpmd_beam_file( beam, self.global_parameters["master_subdir"] + "/" + HDF5filename, ) # except: # print('Error with screen', self.objectname,'at', self.gpt_screen_position) if self.global_parameters["delete_tracking_files"]: os.remove( ( os.path.join( self.global_parameters["master_subdir"], gptbeamfilename ) ).strip('"') )