Skip to content

Direct inference with TensorFlow 2


TensorFlow 2 is available since CMSSW_11_1_X (cmssw#28711, cmsdist#5525). The integration into the software stack can be found in cmsdist/tensorflow.spec and the interface is located in cmssw/PhysicsTools/TensorFlow.

Available versions

TensorFlow el8_amd64_gcc10 el8_amd64_gcc11
v2.6.0 ≥ CMSSW_12_3_4 -
v2.6.4 ≥ CMSSW_12_5_0 ≥ CMSSW_12_5_0
TensorFlow slc7_amd64_gcc900 slc7_amd64_gcc10 slc7_amd64_gcc11
v2.1.0 ≥ CMSSW_11_1_0 - -
v2.3.1 ≥ CMSSW_11_2_0 - -
v2.4.1 ≥ CMSSW_11_3_0 - -
v2.5.0 ≥ CMSSW_12_0_0 ≥ CMSSW_12_0_0 -
v2.6.0 ≥ CMSSW_12_1_0 ≥ CMSSW_12_1_0 ≥ CMSSW_12_3_0
v2.6.4 - ≥ CMSSW_12_5_0 ≥ CMSSW_13_0_0
TensorFlow slc7_amd64_gcc900
v2.1.0 ≥ CMSSW_11_1_0
v2.3.1 ≥ CMSSW_11_2_0

At this time, only CPU support is provided. While GPU support is generally possible, it is currently disabled due to some interference with production workflows but will be enabled once they are resolved.

Software setup

To run the examples shown below, create a mininmal inference setup with the following snippet. Adapt the SCRAM_ARCH according to your operating system and desired compiler.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
export SCRAM_ARCH="el8_amd64_gcc11"
export CMSSW_VERSION="CMSSW_12_6_0"

source "/cvmfs/cms.cern.ch/cmsset_default.sh" ""

cmsrel "${CMSSW_VERSION}"
cd "${CMSSW_VERSION}/src"

cmsenv
scram b

Below, the cmsml Python package is used to convert models from TensorFlow objects (tf.function's or Keras models) to protobuf graph files (documentation). It should be available after executing the commands above. You can check its version via

python -c "import cmsml; print(cmsml.__version__)"

and compare to the released tags. If you want to install a newer version from either the master branch of the cmsml repository or the Python package index (PyPI), you can simply do that via pip.

# into your user directory (usually ~/.local)
pip install --upgrade --user git+https://github.com/cms-ml/cmsml

# _or_

# into a custom directory
pip install --upgrade --prefix "CUSTOM_DIRECTORY" git+https://github.com/cms-ml/cmsml
# into your user directory (usually ~/.local)
pip install --upgrade --user cmsml

# _or_

# into a custom directory
pip install --upgrade --prefix "CUSTOM_DIRECTORY" cmsml

Saving your model

After successfully training, you should save your model in a protobuf graph file which can be read by the interface in CMSSW. Naturally, you only want to save that part of your model that is required to run the network prediction, i.e., it should not contain operations related to model training or loss functions (unless explicitely required). Also, to reduce the memory footprint and to accelerate the inference, variables should be converted to constant tensors. Both of these model transformations are provided by the cmsml package.

Instructions on how to transform and save your model are shown below, depending on whether you use Keras or plain TensorFlow with tf.function's.

The code below saves a Keras Model instance as a protobuf graph file using cmsml.tensorflow.save_graph. In order for Keras to built the internal graph representation before saving, make sure to either compile the model, or pass an input_shape to the first layer:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# coding: utf-8

import tensorflow as tf
import tf.keras.layers as layers
import cmsml

# define your model
model = tf.keras.Sequential()
model.add(layers.InputLayer(input_shape=(10,), name="input"))
model.add(layers.Dense(100, activation="tanh"))
model.add(layers.Dense(3, activation="softmax", name="output"))

# train it
...

# convert to binary (.pb extension) protobuf
# with variables converted to constants
cmsml.tensorflow.save_graph("graph.pb", model, variables_to_constants=True)

Following the Keras naming conventions for certain layers, the input will be named "input" while the output is named "sequential/output/Softmax". To cross check the names, you can save the graph in text format by using the extension ".pb.txt".

Let's consider you write your network model in a single tf.function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# coding: utf-8

import tensorflow as tf
import cmsml

# define the model
@tf.function
def model(x):
    # lift variable initialization to the lowest context so they are
    # not re-initialized on every call (eager calls or signature tracing)
    with tf.init_scope():
        W = tf.Variable(tf.ones([10, 1]))
        b = tf.Variable(tf.ones([1]))

    # define your "complex" model here
    h = tf.add(tf.matmul(x, W), b)
    y = tf.tanh(h, name="y")

    return y

In TensorFlow terms, the model function is polymorphic - it accepts different types of the input tensor x (tf.float32, tf.float64, ...). For each type, TensorFlow will create a concrete function with an associated tf.Graph object. This mechanism is referred to as signature tracing. For deeper insights into tf.function, the concepts of signature tracing, polymorphic and concrete functions, see the guide on Better performance with tf.function.

To save the model as a protobuf graph file, you explicitely need to create a concrete function. However, this is fairly easy once you know the exact type and shape of all input arguments.

20
21
22
23
24
25
26
27
# create a concrete function
cmodel = model.get_concrete_function(
    tf.TensorSpec(shape=[2, 10], dtype=tf.float32),
)

# convert to binary (.pb extension) protobuf
# with variables converted to constants
cmsml.tensorflow.save_graph("graph.pb", cmodel, variables_to_constants=True)

The input will be named "x" while the output is named "y". To cross check the names, you can save the graph in text format by using the extension ".pb.txt".

Different method: Frozen signatures

Instead of creating a polymorphic tf.function and extracting a concrete one in a second step, you can directly define an input signature upon definition.

@tf.function(input_signature=(tf.TensorSpec(shape=[2, 10], dtype=tf.float32),))
def model(x):
    ...

This disables signature tracing since the input signature is frozen. However, you can directly pass it to cmsml.tensorflow.save_graph.

Inference in CMSSW

The inference can be implemented to run in a single thread. In general, this does not mean that the module cannot be executed with multiple threads (cmsRun --numThreads <N> <CFG_FILE>), but rather that its performance in terms of evaluation time and especially memory consumption is likely to be suboptimal. Therefore, for modules to be integrated into CMSSW, the multi-threaded implementation is strongly recommended.

CMSSW module setup

If you aim to use the TensorFlow interface in a CMSSW plugin, make sure to include

1
2
3
<use name="PhysicsTools/TensorFlow" />

<flags EDM_PLUGIN="1" />

in your plugins/BuildFile.xml file. If you are using the interface inside the src/ or interface/ directory of your module, make sure to create a global BuildFile.xml file next to theses directories, containing (at least):

1
2
3
4
5
<use name="PhysicsTools/TensorFlow" />

<export>
    <lib name="1" />
</export>

Single-threaded inference

Despite tf.Session being removed in the Python interface as of TensorFlow 2, the concepts of

  • Graph's, containing the constant computational structure and trained variables of your model,
  • Session's, handling execution and data exchange, and
  • the separation between them

live on in the C++ interface. Thus, the overall inference approach is 1) include the interface, 2) initialize Graph and session, 3) per event create input tensors and run the inference, and 4) cleanup.

1. Includes
1
2
3
4
#include "PhysicsTools/TensorFlow/interface/TensorFlow.h"
#include "FWCore/Framework/interface/one/EDAnalyzer.h"
// further framework includes
...
2. Initialize objects
1
2
3
4
5
6
7
8
// configure logging to show warnings (see table below)
tensorflow::setLogging("2");

// load the graph definition
tensorflow::GraphDef* graphDef = tensorflow::loadGraphDef("/path/to/constantgraph.pb");

// create a session
tensorflow::Session* session = tensorflow::createSession(graphDef);
3. Inference
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// create an input tensor
// (example: single batch of 10 values)
tensorflow::Tensor input(tensorflow::DT_FLOAT, { 1, 10 });


// fill the tensor with your input data
// (example: just fill consecutive values)
for (size_t i = 0; i < 10; i++) {
    input.matrix<float>()(0, i) = float(i);
}

// run the evaluation
std::vector<tensorflow::Tensor> outputs;
tensorflow::run(session, { { "input", input } }, { "output" }, &outputs);

// process the output tensor
// (example: print the 5th value of the 0th (the only) example)
std::cout << outputs[0].matrix<float>()(0, 5) << std::endl;
// -> float
4. Cleanup
1
2
tensorflow::closeSession(session);
delete graphDef;
Full example
Click to expand

The example assumes the following directory structure:

MySubsystem/MyModule/
│
├── plugins/
│   ├── MyPlugin.cpp
│   └── BuildFile.xml
│
├── test/
│   └── my_plugin_cfg.py
│
└── data/
    └── graph.pb
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
 * Example plugin to demonstrate the direct single-threaded inference with TensorFlow 2.
 */

#include <memory>

#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/one/EDAnalyzer.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "PhysicsTools/TensorFlow/interface/TensorFlow.h"

class MyPlugin : public edm::one::EDAnalyzer<> {
public:
  explicit MyPlugin(const edm::ParameterSet&);
  ~MyPlugin(){};

  static void fillDescriptions(edm::ConfigurationDescriptions&);

private:
  void beginJob();
  void analyze(const edm::Event&, const edm::EventSetup&);
  void endJob();

  std::string graphPath_;
  std::string inputTensorName_;
  std::string outputTensorName_;

  tensorflow::GraphDef* graphDef_;
  tensorflow::Session* session_;
};

void MyPlugin::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
  // defining this function will lead to a *_cfi file being generated when compiling
  edm::ParameterSetDescription desc;
  desc.add<std::string>("graphPath");
  desc.add<std::string>("inputTensorName");
  desc.add<std::string>("outputTensorName");
  descriptions.addWithDefaultLabel(desc);
}

MyPlugin::MyPlugin(const edm::ParameterSet& config)
    : graphPath_(config.getParameter<std::string>("graphPath")),
      inputTensorName_(config.getParameter<std::string>("inputTensorName")),
      outputTensorName_(config.getParameter<std::string>("outputTensorName")),
      graphDef_(nullptr),
      session_(nullptr) {
  // set tensorflow log level to warning
  tensorflow::setLogging("2");
}

void MyPlugin::beginJob() {
  // load the graph
  graphDef_ = tensorflow::loadGraphDef(graphPath_);

  // create a new session and add the graphDef
  session_ = tensorflow::createSession(graphDef_);
}

void MyPlugin::endJob() {
  // close the session
  tensorflow::closeSession(session_);

  // delete the graph
  delete graphDef_;
  graphDef_ = nullptr;
}

void MyPlugin::analyze(const edm::Event& event, const edm::EventSetup& setup) {
  // define a tensor and fill it with range(10)
  tensorflow::Tensor input(tensorflow::DT_FLOAT, {1, 10});
  for (size_t i = 0; i < 10; i++) {
    input.matrix<float>()(0, i) = float(i);
  }

  // define the output and run
  std::vector<tensorflow::Tensor> outputs;
  tensorflow::run(session_, {{inputTensorName_, input}}, {outputTensorName_}, &outputs);

  // print the output
  std::cout << " -> " << outputs[0].matrix<float>()(0, 0) << std::endl << std::endl;
}

DEFINE_FWK_MODULE(MyPlugin);
1
2
3
4
5
6
<use name="FWCore/Framework" />
<use name="FWCore/PluginManager" />
<use name="FWCore/ParameterSet" />
<use name="PhysicsTools/TensorFlow" />

<flags EDM_PLUGIN="1" />
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# coding: utf-8

import os

import FWCore.ParameterSet.Config as cms
from FWCore.ParameterSet.VarParsing import VarParsing


# get the data/ directory
thisdir = os.path.dirname(os.path.abspath(__file__))
datadir = os.path.join(os.path.dirname(thisdir), "data")

# setup minimal options
options = VarParsing("python")
options.setDefault("inputFiles", "root://xrootd-cms.infn.it//store/mc/RunIISummer20UL17MiniAODv2/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_mc2017_realistic_v9-v1/00000/005708B7-331C-904E-88B9-189011E6C9DD.root")  # noqa
options.parseArguments()

# define the process to run
process = cms.Process("TEST")

# minimal configuration
process.load("FWCore.MessageService.MessageLogger_cfi")
process.MessageLogger.cerr.FwkReport.reportEvery = 1
process.maxEvents = cms.untracked.PSet(
    input=cms.untracked.int32(10),
)
process.source = cms.Source(
    "PoolSource",
    fileNames=cms.untracked.vstring(options.inputFiles),
)

# process options
process.options = cms.untracked.PSet(
    allowUnscheduled=cms.untracked.bool(True),
    wantSummary=cms.untracked.bool(True),
)

# setup MyPlugin by loading the auto-generated cfi (see MyPlugin.fillDescriptions)
process.load("MySubsystem.MyModule.myPlugin_cfi")
process.myPlugin.graphPath = cms.string(os.path.join(datadir, "graph.pb"))
process.myPlugin.inputTensorName = cms.string("input")
process.myPlugin.outputTensorName = cms.string("output")

# define what to run in the path
process.p = cms.Path(process.myPlugin)

Multi-threaded inference

Compared to the single-threaded implementation above, the multi-threaded version has one major difference: both the Graph and the Session are no longer members of a particular module instance, but rather shared between all instances in all threads. See the documentation on the C++ interface of stream modules for details.

Recommendation updated

The previous recommendation stated that the Session is not constant and thus, should not be placed in the global cache, but rather created once per stream module instance. However, it was discovered that, although not explicitely declared as constant in the tensorflow::run() / Session::run() interface, the session is actually not changed during evaluation and can be treated as being effectively constant.

As a result, it is safe to move it to the global cache, next to the Graph object. The TensorFlow interface in CMSSW was adjusted in order to accept const objects in cmssw#40161.

Thus, the overall inference approach is 1) include the interface, 2) let your plugin inherit from edm::stream::EDAnalyzerasdasd and declare the GlobalCache, 3) store in cconst Session*, pointing to the cached session, and 4) per event create input tensors and run the inference.

1. Includes
1
2
3
4
#include "PhysicsTools/TensorFlow/interface/TensorFlow.h"
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
// further framework includes
...

Note that stream/EDAnalyzer.h is included rather than one/EDAnalyzer.h.

2. Define and use the global cache

The cache definition is done by declaring a simple struct. However, for the purpose of just storing a graph and a session object, a so-called tensorflow::SessionCache struct is already provided centrally. It was added in cmssw#40284 and its usage is shown in the following. In case the tensorflow::SessionCache is not (yet) available in your version of CMSSW, expand the "Custom cache struct" section below.

Use it in the edm::GlobalCache template argument and adjust the plugin accordingly.

1
2
3
4
5
6
7
8
9
class MyPlugin : public edm::stream::EDAnalyzer<edm::GlobalCache<tensorflow::SessionCache>> {
public:
    explicit GraphLoadingMT(const edm::ParameterSet&, const tensorflow::SessionCache*);
    ~GraphLoadingMT();

    // additional static methods for initializing and closing the global cache
    static std::unique_ptr<tensorflow::SessionCache> initializeGlobalCache(const edm::ParameterSet&);
    static void globalEndJob(const tensorflow::SessionCache*);
...

Implement initializeGlobalCache to control the behavior of how the cache object is created. You also need to implement globalEndJob, however, it can remain empty as the destructor of tensorflow::SessionCache already handles the closing of the session itself and the deletion of all objects.

std::unique_ptr<tensorflow::SessionCache> MyPlugin::initializeGlobalCache(const edm::ParameterSet& config) {
  std::string graphPath = edm::FileInPath(params.getParameter<std::string>("graphPath")).fullPath();
  return std::make_unique<tensorflow::SessionCache>(graphPath);
}

void MyPlugin::globalEndJob(const tensorflow::SessionCache* cache) {}
Custom cache struct
1
2
3
4
5
6
7
struct MyCache {
  MyCache() : {
  }

  std::atomic<tensorflow::GraphDef*> graph;
  std::atomic<tensorflow::Session*> session;
};

Use it in the edm::GlobalCache template argument and adjust the plugin accordingly.

1
2
3
4
5
6
7
8
9
class MyPlugin : public edm::stream::EDAnalyzer<edm::GlobalCache<CacheData>> {
public:
    explicit GraphLoadingMT(const edm::ParameterSet&, const CacheData*);
    ~GraphLoadingMT();

    // two additional static methods for handling the global cache
    static std::unique_ptr<CacheData> initializeGlobalCache(const edm::ParameterSet&);
    static void globalEndJob(const CacheData*);
...

Implement initializeGlobalCache and globalEndJob to control the behavior of how the cache object is created and destroyed.

See the full example below for more details.

3. Initialize objects

In your module constructor, you can get a pointer to the constant session to perform model evaluation during the event loop.

1
2
3
4
5
6
7
8
// declaration in header
const tensorflow::Session* _session;

// get a pointer to the const session stored in the cache in the constructor init
MyPlugin::MyPlugin(const edm::ParameterSet& config,  const tensorflow::SessionCache* cache)
    : session_(cache->getSession()) {
  ...
}
4. Inference
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// create an input tensor
// (example: single batch of 10 values)
tensorflow::Tensor input(tensorflow::DT_FLOAT, { 1, 10 });


// fill the tensor with your input data
// (example: just fill consecutive values)
for (size_t i = 0; i < 10; i++) {
    input.matrix<float>()(0, i) = float(i);
}

// define the output
std::vector<tensorflow::Tensor> outputs;

// evaluate
// note: in case this line causes the compiler to complain about the const'ness of the session_ in
//       this call, your CMSSW version might not yet support passing a const session, so in this
//       case, pass "const_cast<tensorflow::Session*>(session_)"
tensorflow::run(session_, { { inputTensorName, input } }, { outputTensorName }, &outputs);

// process the output tensor
// (example: print the 5th value of the 0th (the only) example)
std::cout << outputs[0].matrix<float>()(0, 5) << std::endl;
// -> float

Note

If the TensorFlow interface in your CMSSW release does not yet accept const sessions, line 19 in the example above will cause an error during compilation. In this case, replace session_ in that line to

const_cast<tensorflow::Session*>(session_)
Full example
Click to expand

The example assumes the following directory structure:

MySubsystem/MyModule/
│
├── plugins/
│   ├── MyPlugin.cpp
│   └── BuildFile.xml
│
├── test/
│   └── my_plugin_cfg.py
│
└── data/
    └── graph.pb
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
 * Example plugin to demonstrate the direct multi-threaded inference with TensorFlow 2.
 */

#include <memory>

#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "PhysicsTools/TensorFlow/interface/TensorFlow.h"

// put a tensorflow::SessionCache into the global cache structure
// the session cache wraps both a tf graph and a tf session instance and also handles their deletion
class MyPlugin : public edm::stream::EDAnalyzer<edm::GlobalCache<tensorflow::SessionCache>> {
public:
  explicit MyPlugin(const edm::ParameterSet&, const tensorflow::SessionCache*);
  ~MyPlugin(){};

  static void fillDescriptions(edm::ConfigurationDescriptions&);

  // additional static methods for initializing and closing the global cache
  static std::unique_ptr<tensorflow::SessionCache> initializeGlobalCache(const edm::ParameterSet&);
  static void globalEndJob(const tensorflow::SessionCache*);

private:
  void beginJob();
  void analyze(const edm::Event&, const edm::EventSetup&);
  void endJob();

  std::string inputTensorName_;
  std::string outputTensorName_;

  // a pointer to the session created by the global session cache
  const tensorflow::Session* session_;
};

std::unique_ptr<tensorflow::SessionCache> MyPlugin::initializeGlobalCache(const edm::ParameterSet& params) {
  // this method is supposed to create, initialize and return a SessionCache instance
  std::string graphPath = edm::FileInPath(params.getParameter<std::string>("graphPath")).fullPath();
  // Setup the TF backend by configuration
  if (params.getParameter<std::string>("tf_backend") == "cuda"){
    tensorflow::Options options { tensorflow::Backend::cuda};
  }else {
    tensorflow::Options options { tensorflow::Backend::cpu};
  }
  return std::make_unique<tensorflow::SessionCache>(graphPath, options);
}

void MyPlugin::globalEndJob(const tensorflow::SessionCache* cache) {}

void MyPlugin::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
  // defining this function will lead to a *_cfi file being generated when compiling
  edm::ParameterSetDescription desc;
  desc.add<std::string>("graphPath");
  desc.add<std::string>("inputTensorName");
  desc.add<std::string>("outputTensorName");
  descriptions.addWithDefaultLabel(desc);
}

MyPlugin::MyPlugin(const edm::ParameterSet& config,  const tensorflow::SessionCache* cache)
    : inputTensorName_(config.getParameter<std::string>("inputTensorName")),
      outputTensorName_(config.getParameter<std::string>("outputTensorName")),
      session_(cache->getSession()) {}

void MyPlugin::beginJob() {}

void MyPlugin::endJob() {
  // close the session
  tensorflow::closeSession(session_);
}

void MyPlugin::analyze(const edm::Event& event, const edm::EventSetup& setup) {
  // define a tensor and fill it with range(10)
  tensorflow::Tensor input(tensorflow::DT_FLOAT, {1, 10});
  for (size_t i = 0; i < 10; i++) {
    input.matrix<float>()(0, i) = float(i);
  }

  // define the output
  std::vector<tensorflow::Tensor> outputs;

  // evaluate
  // note: in case this line causes the compile to complain about the const'ness of the session_ in
  //       this call, your CMSSW version might not yet support passing a const session, so in this
  //       case, pass "const_cast<tensorflow::Session*>(session_)"
  tensorflow::run(session_, {{inputTensorName_, input}}, {outputTensorName_}, &outputs);

  // print the output
  std::cout << " -> " << outputs[0].matrix<float>()(0, 0) << std::endl << std::endl;
}

DEFINE_FWK_MODULE(MyPlugin);
1
2
3
4
5
6
<use name="FWCore/Framework" />
<use name="FWCore/PluginManager" />
<use name="FWCore/ParameterSet" />
<use name="PhysicsTools/TensorFlow" />

<flags EDM_PLUGIN="1" />
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# coding: utf-8

import os

import FWCore.ParameterSet.Config as cms
from FWCore.ParameterSet.VarParsing import VarParsing


# get the data/ directory
thisdir = os.path.dirname(os.path.abspath(__file__))
datadir = os.path.join(os.path.dirname(thisdir), "data")

# setup minimal options
options = VarParsing("python")
options.setDefault("inputFiles", "root://xrootd-cms.infn.it//store/mc/RunIISummer20UL17MiniAODv2/TTToSemiLeptonic_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_mc2017_realistic_v9-v1/00000/005708B7-331C-904E-88B9-189011E6C9DD.root")  # noqa
options.parseArguments()

# define the process to run
process = cms.Process("TEST")

# minimal configuration
process.load("FWCore.MessageService.MessageLogger_cfi")
process.MessageLogger.cerr.FwkReport.reportEvery = 1
process.maxEvents = cms.untracked.PSet(
    input=cms.untracked.int32(10),
)
process.source = cms.Source(
    "PoolSource",
    fileNames=cms.untracked.vstring(options.inputFiles),
)

# process options
process.options = cms.untracked.PSet(
    allowUnscheduled=cms.untracked.bool(True),
    wantSummary=cms.untracked.bool(True),
)

# setup MyPlugin by loading the auto-generated cfi (see MyPlugin.fillDescriptions)
process.load("MySubsystem.MyModule.myPlugin_cfi")
process.myPlugin.graphPath = cms.string(os.path.join(datadir, "graph.pb"))
process.myPlugin.inputTensorName = cms.string("input")
process.myPlugin.outputTensorName = cms.string("output")

# define what to run in the path
process.p = cms.Path(process.myPlugin)

GPU backend

By default the TensorFlow sessions get created for CPU running. Since CMSSW_13_1_X the GPU backend for TensorFlow is available in the cmssw release.

Minimal changes are needed in the inference code to move the model on the GPU. A tensorflow::Options struct is available to setup the backend.

1
2
3
4
5
6
tensorflow::Options options { tensorflow::Backend::cuda};

# Initialize the cache
tensorflow::SessionCache cache(pbFile, options);
# or a single session
const tensorflow::Session* session = tensorflow::createSession(graphDef, options);

CMSSW modules should add an options in the PSets of the producers and analyzers to configure on the fly the TensorFlow backend for the sessions created by the plugins.

Optimization

Depending on the use case, the following approaches can optimize the inference performance. It could be worth checking them out in your algorithm.

Further optimization approaches can be found in the integration checklist.

Reusing tensors

In some cases, instead of creating new input tensors for each inference call, you might want to store input tensors as members of your plugin. This is of course possible if you know its exact shape a-prioro and comes with the cost of keeping the tensor in memory for the lifetime of your module instance.

You can use

tensor.flat<float>().setZero();

to reset the values of your tensor prior to each call.

Tensor data access via pointers

As shown in the examples above, tensor data can be accessed through methods such as flat<type>() or matrix<type>() which return objects that represent the underlying data in the requested structure (tensorflow::Tensor C++ API). To read and manipulate particular elements, you can directly call this object with the coordinates of an element.

// matrix returns a 2D representation
// set element (b,i) to f
tensor.matrix<float>()(b, i) = float(f);

However, doing this for a large input tensor might entail some overhead. Since the data is actually contiguous in memory (C-style "row-major" memory ordering), a faster (though less explicit) way of interacting with tensor data is using a pointer.

// get the pointer to the first tensor element
float* d = tensor.flat<float>().data();

Now, the tensor data can be filled using simple and fast pointer arithmetic.

// fill tensor data using pointer arithmethic
// memory ordering is row-major, so the most outer loop corresponds dimension 0
for (size_t b = 0; b < batchSize; b++) {
    for (size_t i = 0; i < nFeatures; i++, d++) {  // note the d++
        *d = float(i);
    }
}

Inter- and intra-operation parallelism

Debugging and local processing only

Parallelism between (inter) and within (intra) operations can greatly improve the inference performance. However, this allows TensorFlow to manage and schedule threads on its own, possibly interfering with the thread model inherent to CMSSW. For inference code that is to be officially integrated, you should avoid inter- and intra-op parallelism and rather adhere to the examples shown above.

You can configure the amount of inter- and infra-op threads via the second argument of the tensorflow::createSession method.

1
tensorflow::Session* session = tensorflow::createSession(graphDef, nThreads);
1
2
3
4
5
tensorflow::SessionOptions sessionOptions;
sessionOptions.config.set_intra_op_parallelism_threads(nThreads);
sessionOptions.config.set_inter_op_parallelism_threads(nThreads);

tensorflow::Session* session = tensorflow::createSession(graphDef, sessionOptions);

Then, when calling tensorflow::run, pass the internal name of the TensorFlow threadpool, i.e. "tensorflow", as the last argument.

1
2
3
4
5
6
7
8
std::vector<tensorflow::Tensor> outputs;
tensorflow::run(
    session,
    { { inputTensorName, input } },
    { outputTensorName },
    &outputs,
    "tensorflow"
);

Miscellaneous

Logging

By default, TensorFlow logging is quite verbose. This can be changed by either setting the TF_CPP_MIN_LOG_LEVEL environment varibale before calling cmsRun, or within your code through tensorflow::setLogging(level).

Verbosity level TF_CPP_MIN_LOG_LEVEL
debug "0"
info "1" (default)
warning "2"
error "3"
none "4"

Forwarding logs to the MessageLogger service is not possible yet.


Authors: Marcel Rieger


Last update: January 8, 2024