import os
import subprocess
import numpy as np
import yaml
from typing import Dict, Literal
from ...Framework_objects import (
frameworkLattice,
getGrids,
)
from ...FrameworkHelperFunctions import saveFile
from ...Modules import Beams as rbf
from ...Modules.Beams.opal import find_opal_s_positions
from ...Modules.SDDSFile import SDDSFile
# import mpi4py
# mpi4py.rc.initialize = False
from laura.translator.converters.codes.opal import (
opal_option,
opal_distribution,
opal_fieldsolver,
opal_beam,
opal_track,
opal_run,
)
from ...Modules.constants import speed_of_light
from ...Modules.units import UnitValue
[docs]
def update_globals(global_settings, beamlen=None, sample_interval=1):
grids = getGrids()
with open(
os.path.join(os.path.dirname(__file__), "globals_Opal.yaml"), "r"
) as file:
opalglobal = yaml.load(file, Loader=yaml.Loader)
for sc in ['x', 'y', 'z']:
if f"SC_3D_N{sc}f" in list(global_settings.keys()):
scconv = sc.upper().replace('Z', 'T')
global_settings.update({f"M{scconv}": global_settings[f"SC_3D_N{sc}f"]})
for typ, vals in opalglobal.items():
for k, v in vals.items():
if k in global_settings.keys():
opalglobal[typ].update({k: v})
if beamlen:
gridsize = grids.getGridSizes(
(beamlen / sample_interval)
)
opalglobal["fieldsolver"].update({"MX": gridsize, "MY": gridsize, "MT": gridsize})
return opalglobal
[docs]
class opalLattice(frameworkLattice):
"""
Class for defining the GPT lattice object, used for
converting the :class:`~simba.Framework_objects.frameworkObject`s defined in the
:class:`~simba.Framework_objects.frameworkLattice` into a string representation of
the lattice suitable for an OPAL input file.
"""
code: str = "opal"
"""String indicating the lattice object type"""
headers: Dict = {}
"""Headers to be included in the OPAL lattice file"""
particle_definition: str = None
"""Name of initial particle distribution"""
time_step_size: float = 2e-12
"""Step size for tracking"""
breakstr: str = "//----------------------------------------------------------------------------"
"""String used for separating headers in the input file"""
version: str = "202210"
"""Version of OPAL"""
maxsteps: int = 1000000
"""Maximum number of steps for tracking; will be set dynamically once the lattice is parsed"""
headers: Dict = {}
"""Section headers for OPAL input file"""
ref_s: float = None
"""Reference s position"""
ref_idx: int = None
"""Reference particle index"""
def model_post_init(self, __context):
super().model_post_init(__context)
if (
"input" in self.file_block
and "particle_definition" in self.file_block["input"]
):
if (
self.file_block["input"]["particle_definition"]
== "initial_distribution"
):
self.particle_definition = "laser"
else:
self.particle_definition = self.file_block["input"][
"particle_definition"
]
else:
self.particle_definition = self.start
@property
def space_charge_mode(self) -> str | None:
"""
Get the space charge mode based on
:attr:`~simba.Framework_objects.frameworkLattice.globalSettings` or
:attr:`~simba.Framework_objects.frameworkLattice.file_block`.
Returns
-------
str
Space charge mode as string, or None if not provided.
"""
if (
"charge" in self.file_block
and "space_charge_mode" in self.file_block["charge"]
):
return self.file_block["charge"]["space_charge_mode"]
elif (
"charge" in self.globalSettings
and "space_charge_mode" in self.globalSettings["charge"]
):
return self.globalSettings["charge"]["space_charge_mode"]
else:
return None
@space_charge_mode.setter
def space_charge_mode(self, mode: Literal["2d", "3d", "2D", "3D"]) -> None:
"""
Set the space charge mode manually ["2D", "3D"].
Parameters
----------
mode: Literal["2d", "3d", "2D", "3D"]
The space charge calculation mode
"""
if "charge" not in self.file_block:
self.file_block["charge"] = {}
self.file_block["charge"]["space_charge_mode"] = mode
[docs]
def write(self):
self.section.opal_headers = self.headers
output = self.section.to_opal(
energy=self.global_parameters["beam"].centroids.mean_cpz.val / 1e6,
breakstr=self.breakstr,
)
command_file = (
self.global_parameters["master_subdir"] + "/" + self.objectname + ".in"
)
saveFile(command_file, output, "w")
self.files.append(command_file)
[docs]
def preProcess(self):
super().preProcess()
prefix = self.get_prefix()
fpath = self.read_input_file(prefix, self.particle_definition)
self.ref_s = self.global_parameters["beam"].s
self.ref_idx = self.global_parameters["beam"].reference_particle_index
self.hdf5_to_opal()
beamlen = len(self.global_parameters["beam"].x)
pc = np.mean(self.global_parameters["beam"].cpz.val) / 1e9
bcurrent = abs(self.global_parameters["beam"].total_charge * 1e6)
chargesign = int(self.global_parameters["beam"].chargesign[0])
if "particle_definition" in list(self.file_block["input"].keys()):
initobj = "laser" if self.file_block["input"]["particle_definition"] == "initial_distribution" else self.start
else:
initobj = self.start
self.headers["option"] = opal_option()
self.headers["distribution"] = opal_distribution(input_particle_definition=f"\"{initobj}.opal\"")
self.headers["fieldsolver"] = opal_fieldsolver(
npart=beamlen,
sample_interval=self.sample_interval,
space_charge_mode=str(self.space_charge_mode),
)
self.headers["beam"] = opal_beam(
PC=pc,
NPART=beamlen,
CHARGE=chargesign,
PARTICLE=self.global_parameters["beam"].species.upper(),
BCURRENT=bcurrent,
)
self.headers["track"] = opal_track(
DT=self.time_step_size,
MAXSTEPS=self.maxsteps,
LINE=self.objectname,
ZSTOP=self.endObject.physical.end.z - self.startObject.physical.start.z,
)
self.headers["run"] = opal_run()
self.files.append(f"{self.global_parameters['master_subdir']}/{initobj}.opal")
self.write()
[docs]
def postProcess(self):
elems = self.getSValues(as_dict=True)
svals = {}
for s in self.screens_and_bpms:
svals.update({s.name: elems[s.name] - self.startObject.physical.start.z})
spositions = find_opal_s_positions(
f'{self.global_parameters["master_subdir"]}/{self.objectname}.h5',
svals,
tolerance=1.0,
)
for elem in self.screens_and_bpms:
if elem.name in spositions:
opalbeamname = f'{self.global_parameters["master_subdir"]}/{self.objectname}.h5'
beam = rbf.beam()
beam.read_opal_beam_file(filename=opalbeamname, step=spositions[elem.name])
beam._beam.z = UnitValue(beam._beam.z.val + self.startObject.physical.middle.z, "m")
beam._beam.t = UnitValue(beam._beam.t.val + (self.startObject.physical.middle.z / speed_of_light), "s")
rbf.openpmd.write_openpmd_beam_file(
beam,
f'{self.global_parameters["master_subdir"]}/{elem.name}.openpmd.hdf5',
)
opalbeamname = f'{self.global_parameters["master_subdir"]}/{self.objectname}.h5'
beam = rbf.beam()
beam.read_opal_beam_file(filename=opalbeamname, step=-1)
rbf.openpmd.write_openpmd_beam_file(
beam,
f'{self.global_parameters["master_subdir"]}/{self.endObject.name}.openpmd.hdf5',
)
self.commandFiles = {}
opalObject = SDDSFile()
opalObject.read_file(f"{self.global_parameters['master_subdir']}/{self.objectname}.stat")
opalData = opalObject.data
for k in opalData:
# handling for multiple elegant runs per file (e.g. error simulations)
# by default extract only the first run (in ELEGANT this is the fiducial)
if isinstance(opalData[k], np.ndarray) and (opalData[k].ndim > 1):
opalData[k] = opalData[k][0]
else:
opalData[k] = np.array(opalData[k])
if self.ref_s is not None:
opalData["s"] += self.ref_s
import h5py
with h5py.File(f"{self.global_parameters['master_subdir']}/{self.objectname}.opal_twiss.h5", "w") as f:
for k, v in opalData.items():
try:
f.create_dataset(k, data=np.array(v))
except TypeError as e:
pass
[docs]
def hdf5_to_opal(self):
emitted = True if self.particle_definition == "laser" else False
rbf.opal.write_opal_beam_file(
self.global_parameters["beam"],
self.global_parameters["master_subdir"] + "/" + self.particle_definition + '.opal',
subz=self.startObject.physical.start.z,
emitted=emitted,
)
[docs]
def run(self):
"""Run the code with input 'filename'"""
if self.remote_setup:
self.run_remote()
else:
if not os.name == "nt":
command = "bash -c '" + " ".join(self.executables[self.code] + [self.objectname + ".in"]) + "'"
with open(
os.path.abspath(
self.global_parameters["master_subdir"]
+ "/"
+ self.objectname
+ ".log"
),
"w",
) as f:
subprocess.call(
command,
stdout=f,
cwd=self.global_parameters["master_subdir"],
env={**os.environ},
shell=True
)