# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Extensions to the CompilerEnv environment for LLVM."""
import os
import shutil
from pathlib import Path
from typing import Iterable, List, Optional, Union, cast
import numpy as np
from gym.spaces import Box
from gym.spaces import Dict as DictSpace
from compiler_gym.datasets import Benchmark, BenchmarkInitError, Dataset
from compiler_gym.envs.compiler_env import CompilerEnv
from compiler_gym.envs.llvm.datasets import get_llvm_datasets
from compiler_gym.envs.llvm.llvm_benchmark import ClangInvocation, make_benchmark
from compiler_gym.envs.llvm.llvm_rewards import (
BaselineImprovementNormalizedReward,
CostFunctionReward,
NormalizedReward,
)
from compiler_gym.spaces import Commandline, CommandlineFlag, Scalar, Sequence
from compiler_gym.third_party.autophase import AUTOPHASE_FEATURE_NAMES
from compiler_gym.third_party.inst2vec import Inst2vecEncoder
from compiler_gym.third_party.llvm import download_llvm_files
from compiler_gym.third_party.llvm.instcount import INST_COUNT_FEATURE_NAMES
from compiler_gym.util.runfiles_path import runfiles_path
_ACTIONS_LIST = Path(
runfiles_path("compiler_gym/envs/llvm/service/passes/actions_list.txt")
)
_FLAGS_LIST = Path(
runfiles_path("compiler_gym/envs/llvm/service/passes/actions_flags.txt")
)
_DESCRIPTIONS_LIST = Path(
runfiles_path("compiler_gym/envs/llvm/service/passes/actions_descriptions.txt")
)
def _read_list_file(path: Path) -> Iterable[str]:
with open(str(path)) as f:
for action in f:
if action.strip():
yield action.strip()
# TODO(github.com/facebookresearch/CompilerGym/issues/122): Replace text file
# parsing with build-generated python modules and import them.
_ACTIONS = list(_read_list_file(_ACTIONS_LIST))
_FLAGS = dict(zip(_ACTIONS, _read_list_file(_FLAGS_LIST)))
_DESCRIPTIONS = dict(zip(_ACTIONS, _read_list_file(_DESCRIPTIONS_LIST)))
_INST2VEC_ENCODER = Inst2vecEncoder()
_LLVM_DATASETS: Optional[List[Dataset]] = None
def _get_llvm_datasets(site_data_base: Optional[Path] = None) -> Iterable[Dataset]:
"""Get the LLVM datasets. Use a singleton value when site_data_base is the
default value.
"""
global _LLVM_DATASETS
if site_data_base is None:
if _LLVM_DATASETS is None:
_LLVM_DATASETS = list(get_llvm_datasets(site_data_base=site_data_base))
return _LLVM_DATASETS
return get_llvm_datasets(site_data_base=site_data_base)
[docs]class LlvmEnv(CompilerEnv):
"""A specialized CompilerEnv for LLVM.
This extends the default :class:`CompilerEnv
<compiler_gym.envs.CompilerEnv>` environment, adding extra LLVM
functionality. Specifically, the actions use the :class:`CommandlineFlag
<compiler_gym.spaces.CommandlineFlag>` space, which is a type of
:code:`Discrete` space that provides additional documentation about each
action, and the :meth:`LlvmEnv.commandline()
<compiler_gym.envs.LlvmEnv.commandline>` method can be used to produce an
equivalent LLVM opt invocation for the current environment state.
"""
def __init__(
self,
*args,
benchmark: Optional[Union[str, Benchmark]] = None,
datasets_site_path: Optional[Path] = None,
**kwargs,
):
# First perform a one-time download of LLVM binaries that are needed by
# the LLVM service and are not included by the pip-installed package.
download_llvm_files()
super().__init__(
*args,
**kwargs,
# Set a default benchmark for use.
benchmark=benchmark or "cbench-v1/qsort",
datasets=_get_llvm_datasets(site_data_base=datasets_site_path),
rewards=[
CostFunctionReward(
id="IrInstructionCount",
cost_function="IrInstructionCount",
init_cost_function="IrInstructionCountO0",
default_negates_returns=True,
deterministic=True,
platform_dependent=False,
),
NormalizedReward(
id="IrInstructionCountNorm",
cost_function="IrInstructionCount",
init_cost_function="IrInstructionCountO0",
max=1,
default_negates_returns=True,
deterministic=True,
platform_dependent=False,
),
BaselineImprovementNormalizedReward(
id="IrInstructionCountO3",
cost_function="IrInstructionCount",
baseline_cost_function="IrInstructionCountO3",
init_cost_function="IrInstructionCountO0",
success_threshold=1,
default_negates_returns=True,
deterministic=True,
platform_dependent=False,
),
BaselineImprovementNormalizedReward(
id="IrInstructionCountOz",
cost_function="IrInstructionCount",
baseline_cost_function="IrInstructionCountOz",
init_cost_function="IrInstructionCountO0",
success_threshold=1,
default_negates_returns=True,
deterministic=True,
platform_dependent=False,
),
CostFunctionReward(
id="ObjectTextSizeBytes",
cost_function="ObjectTextSizeBytes",
init_cost_function="ObjectTextSizeO0",
default_negates_returns=True,
deterministic=True,
platform_dependent=True,
),
NormalizedReward(
id="ObjectTextSizeNorm",
cost_function="ObjectTextSizeBytes",
init_cost_function="ObjectTextSizeO0",
max=1,
default_negates_returns=True,
deterministic=True,
platform_dependent=True,
),
BaselineImprovementNormalizedReward(
id="ObjectTextSizeO3",
cost_function="ObjectTextSizeBytes",
init_cost_function="ObjectTextSizeO0",
baseline_cost_function="ObjectTextSizeO3",
success_threshold=1,
default_negates_returns=True,
deterministic=True,
platform_dependent=True,
),
BaselineImprovementNormalizedReward(
id="ObjectTextSizeOz",
cost_function="ObjectTextSizeBytes",
init_cost_function="ObjectTextSizeO0",
baseline_cost_function="ObjectTextSizeOz",
success_threshold=1,
default_negates_returns=True,
deterministic=True,
platform_dependent=True,
),
],
)
# Mutable runtime configuration options that must be set on every call
# to reset.
self._runtimes_per_observation_count: Optional[int] = None
self._runtimes_warmup_per_observation_count: Optional[int] = None
self.inst2vec = _INST2VEC_ENCODER
self.observation.spaces["CpuInfo"].space = DictSpace(
{
"name": Sequence(size_range=(0, None), dtype=str),
"cores_count": Scalar(min=None, max=None, dtype=int),
"l1i_cache_size": Scalar(min=None, max=None, dtype=int),
"l1i_cache_count": Scalar(min=None, max=None, dtype=int),
"l1d_cache_size": Scalar(min=None, max=None, dtype=int),
"l1d_cache_count": Scalar(min=None, max=None, dtype=int),
"l2_cache_size": Scalar(min=None, max=None, dtype=int),
"l2_cache_count": Scalar(min=None, max=None, dtype=int),
"l3_cache_size": Scalar(min=None, max=None, dtype=int),
"l3_cache_count": Scalar(min=None, max=None, dtype=int),
"l4_cache_size": Scalar(min=None, max=None, dtype=int),
"l4_cache_count": Scalar(min=None, max=None, dtype=int),
}
)
self.observation.add_derived_space(
id="Inst2vecPreprocessedText",
base_id="Ir",
space=Sequence(size_range=(0, None), dtype=str),
translate=self.inst2vec.preprocess,
default_value="",
)
self.observation.add_derived_space(
id="Inst2vecEmbeddingIndices",
base_id="Ir",
space=Sequence(size_range=(0, None), dtype=np.int32),
translate=lambda base_observation: self.inst2vec.encode(
self.inst2vec.preprocess(base_observation)
),
default_value=np.array([self.inst2vec.vocab["!UNK"]]),
)
self.observation.add_derived_space(
id="Inst2vec",
base_id="Ir",
space=Sequence(size_range=(0, None), dtype=np.ndarray),
translate=lambda base_observation: self.inst2vec.embed(
self.inst2vec.encode(self.inst2vec.preprocess(base_observation))
),
default_value=np.vstack(
[self.inst2vec.embeddings[self.inst2vec.vocab["!UNK"]]]
),
)
self.observation.add_derived_space(
id="InstCountDict",
base_id="InstCount",
space=DictSpace(
{
f"{name}Count": Scalar(min=0, max=None, dtype=int)
for name in INST_COUNT_FEATURE_NAMES
}
),
translate=lambda base_observation: {
f"{name}Count": val
for name, val in zip(INST_COUNT_FEATURE_NAMES, base_observation)
},
)
self.observation.add_derived_space(
id="InstCountNorm",
base_id="InstCount",
space=Box(
low=0,
high=1,
shape=(len(INST_COUNT_FEATURE_NAMES) - 1,),
dtype=np.float32,
),
translate=lambda base_observation: (
base_observation[1:] / max(base_observation[0], 1)
).astype(np.float32),
)
self.observation.add_derived_space(
id="InstCountNormDict",
base_id="InstCountNorm",
space=DictSpace(
{
f"{name}Density": Scalar(min=0, max=None, dtype=int)
for name in INST_COUNT_FEATURE_NAMES[1:]
}
),
translate=lambda base_observation: {
f"{name}Density": val
for name, val in zip(INST_COUNT_FEATURE_NAMES[1:], base_observation)
},
)
self.observation.add_derived_space(
id="AutophaseDict",
base_id="Autophase",
space=DictSpace(
{
name: Scalar(min=0, max=None, dtype=int)
for name in AUTOPHASE_FEATURE_NAMES
}
),
translate=lambda base_observation: {
name: val
for name, val in zip(AUTOPHASE_FEATURE_NAMES, base_observation)
},
)
[docs] def reset(self, *args, **kwargs):
try:
observation = super().reset(*args, **kwargs)
except ValueError as e:
# Catch and re-raise some known benchmark initialization errors with
# a more informative error type.
if "Failed to compute .text size cost" in str(e):
raise BenchmarkInitError(
f"Failed to initialize benchmark {self._benchmark_in_use.uri}: {e}"
) from e
elif (
"File not found:" in str(e)
or "File is empty:" in str(e)
or "Error reading file:" in str(e)
):
raise BenchmarkInitError(str(e)) from e
raise
# Resend the runtimes-per-observation session parameter, if it is a
# non-default value.
if self._runtimes_per_observation_count is not None:
self.runtime_observation_count = self._runtimes_per_observation_count
if self._runtimes_warmup_per_observation_count is not None:
self.runtime_warmup_runs_count = self._runtimes_warmup_per_observation_count
return observation
[docs] def make_benchmark(
self,
inputs: Union[
str, Path, ClangInvocation, List[Union[str, Path, ClangInvocation]]
],
copt: Optional[List[str]] = None,
system_includes: bool = True,
timeout: int = 600,
) -> Benchmark:
"""Create a benchmark for use with this environment.
This function takes one or more inputs and uses them to create a
benchmark that can be passed to :meth:`compiler_gym.envs.LlvmEnv.reset`.
For single-source C/C++ programs, you can pass the path of the source
file:
>>> benchmark = make_benchmark('my_app.c')
>>> env = gym.make("llvm-v0")
>>> env.reset(benchmark=benchmark)
The clang invocation used is roughly equivalent to:
.. code-block::
$ clang my_app.c -O0 -c -emit-llvm -o benchmark.bc
Additional compile-time arguments to clang can be provided using the
:code:`copt` argument:
>>> benchmark = make_benchmark('/path/to/my_app.cpp', copt=['-O2'])
If you need more fine-grained control over the options, you can directly
construct a :class:`ClangInvocation
<compiler_gym.envs.llvm.ClangInvocation>` to pass a list of arguments to
clang:
>>> benchmark = make_benchmark(
ClangInvocation(['/path/to/my_app.c'], timeout=10)
)
For multi-file programs, pass a list of inputs that will be compiled
separately and then linked to a single module:
>>> benchmark = make_benchmark([
'main.c',
'lib.cpp',
'lib2.bc',
])
If you already have prepared bitcode files, those can be linked and used
directly:
>>> benchmark = make_benchmark([
'bitcode1.bc',
'bitcode2.bc',
])
.. note::
LLVM bitcode compatibility is
`not guaranteed <https://llvm.org/docs/DeveloperPolicy.html#ir-backwards-compatibility>`_,
so you must ensure that any precompiled bitcodes are compatible with the
LLVM version used by CompilerGym, which can be queried using
:func:`LlvmEnv.compiler_version <compiler_gym.envs.CompilerEnv.compiler_version>`.
:param inputs: An input, or list of inputs.
:param copt: A list of command line options to pass to clang when
compiling source files.
:param system_includes: Whether to include the system standard libraries
during compilation jobs. This requires a system toolchain. See
:func:`get_system_includes`.
:param timeout: The maximum number of seconds to allow clang to run
before terminating.
:return: A :code:`Benchmark` instance.
:raises FileNotFoundError: If any input sources are not found.
:raises TypeError: If the inputs are of unsupported types.
:raises OSError: If a compilation job fails.
:raises TimeoutExpired: If a compilation job exceeds :code:`timeout`
seconds.
"""
return make_benchmark(
inputs=inputs,
copt=copt,
system_includes=system_includes,
timeout=timeout,
)
def _make_action_space(self, name: str, entries: List[str]) -> Commandline:
flags = [
CommandlineFlag(
name=entry, flag=_FLAGS[entry], description=_DESCRIPTIONS[entry]
)
for entry in entries
]
return Commandline(items=flags, name=name)
[docs] def commandline( # pylint: disable=arguments-differ
self, textformat: bool = False
) -> str:
"""Returns an LLVM :code:`opt` command line invocation for the current
environment state.
:param textformat: Whether to generate a command line that processes
text-format LLVM-IR or bitcode (the default).
:returns: A command line string.
"""
command = cast(Commandline, self.action_space).commandline(self.actions)
if textformat:
return f"opt {command} input.ll -S -o output.ll"
else:
return f"opt {command} input.bc -o output.bc"
[docs] def commandline_to_actions(self, commandline: str) -> List[int]:
"""Returns a list of actions from the given command line.
:param commandline: A command line invocation, as generated by
:meth:`env.commandline() <compiler_gym.envs.LlvmEnv.commandline>`.
:return: A list of actions.
:raises ValueError: In case the command line string is malformed.
"""
# Strip the decorative elements that LlvmEnv.commandline() adds.
if not commandline.startswith("opt "):
raise ValueError(f"Invalid commandline: `{commandline}`")
if commandline.endswith(" input.ll -S -o output.ll"):
commandline = commandline[len("opt ") : -len(" input.ll -S -o output.ll")]
elif commandline.endswith(" input.bc -o output.bc"):
commandline = commandline[len("opt ") : -len(" input.bc -o output.bc")]
else:
raise ValueError(f"Invalid commandline: `{commandline}`")
return self.action_space.from_commandline(commandline)
@property
def ir(self) -> str:
"""Print the LLVM-IR of the program in its current state.
Alias for :code:`env.observation["Ir"]`.
:return: A string of LLVM-IR.
"""
return self.observation["Ir"]
@property
def ir_sha1(self) -> str:
"""Return the 40-characeter hex sha1 checksum of the current IR.
Equivalent to: :code:`hashlib.sha1(env.ir.encode("utf-8")).hexdigest()`.
:return: A 40-character hexadecimal sha1 string.
"""
return self.observation["IrSha1"]
[docs] def write_ir(self, path: Union[Path, str]) -> Path:
"""Write the current program state to a file.
:param path: The path of the file to write.
:return: The input :code:`path` argument.
"""
path = Path(path).expanduser()
with open(path, "w") as f:
f.write(self.ir)
return path
[docs] def write_bitcode(self, path: Union[Path, str]) -> Path:
"""Write the current program state to a bitcode file.
:param path: The path of the file to write.
:return: The input :code:`path` argument.
"""
path = Path(path).expanduser()
tmp_path = self.observation["BitcodeFile"]
try:
shutil.copyfile(tmp_path, path)
finally:
os.unlink(tmp_path)
return path
[docs] def render(
self,
mode="human",
) -> Optional[str]:
if mode == "human":
print(self.ir)
else:
return super().render(mode)
@property
def runtime_observation_count(self) -> int:
"""The number of runtimes to return for the Runtime observation space.
See the :ref:`Runtime observation space reference <llvm/index:Runtime>`
for further details.
Example usage:
>>> env = compiler_gym.make("llvm-v0")
>>> env.reset()
>>> env.runtime_observation_count = 10
>>> len(env.observation.Runtime())
10
:getter: Returns the number of runtimes that will be returned when a
:code:`Runtime` observation is requested.
:setter: Set the number of runtimes to compute when a :code:`Runtime`
observation is requested.
:type: int
"""
return int(self.send_param("llvm.get_runtimes_per_observation_count", ""))
@runtime_observation_count.setter
def runtime_observation_count(self, n: int) -> None:
self._runtimes_per_observation_count = n
self.send_param("llvm.set_runtimes_per_observation_count", str(n))
@property
def runtime_warmup_runs_count(self) -> int:
"""The number of warmup runs of the binary to perform before measuring
the Runtime observation space.
See the :ref:`Runtime observation space reference <llvm/index:Runtime>`
for further details.
Example usage:
>>> env = compiler_gym.make("llvm-v0")
>>> env.reset()
>>> env.runtime_observation_count = 10
>>> len(env.observation.Runtime())
10
:getter: Returns the number of runs that be performed before measuring
the :code:`Runtime` observation is requested.
:setter: Set the number of warmup runs to perform when a :code:`Runtime`
observation is requested.
:type: int
"""
return int(
self.send_param("llvm.get_warmup_runs_count_per_runtime_observation", "")
)
@runtime_warmup_runs_count.setter
def runtime_warmup_runs_count(self, n: int) -> None:
self._runtimes_warmup_per_observation_count = n
self.send_param("llvm.set_warmup_runs_count_per_runtime_observation", str(n))