1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/python/lib/core/py_func.h"
17
18#include <array>
19
20#include "numpy/arrayobject.h"
21#include "tensorflow/c/eager/c_api.h"
22#include "tensorflow/c/eager/c_api_internal.h"
23#include "tensorflow/c/tf_status_helper.h"
24#include "tensorflow/core/framework/allocation_description.pb.h"
25#include "tensorflow/core/framework/op_kernel.h"
26#include "tensorflow/core/lib/core/errors.h"
27#include "tensorflow/core/lib/core/threadpool.h"
28#include "tensorflow/core/platform/macros.h"
29#include "tensorflow/core/platform/mutex.h"
30#include "tensorflow/core/platform/types.h"
31#include "tensorflow/python/eager/pywrap_tfe.h"
32#include "tensorflow/python/lib/core/ndarray_tensor_bridge.h"
33#include "tensorflow/python/lib/core/py_util.h"
34#include "tensorflow/python/lib/core/safe_ptr.h"
35
36#include <Python.h>
37
38namespace tensorflow {
39namespace {
40
41static mutex mu(LINKER_INITIALIZED);
42static PyObject* py_trampoline GUARDED_BY(mu) = nullptr;
43
44// Returns the py_trampoline that is used to pass the control to the
45// python runtime.
46PyObject* GetPyTrampoline() {
47  mutex_lock l(mu);
48  return py_trampoline;
49}
50
51// A call to the registered python function.
52struct PyCall {
53  // Passed to python runtime to call the python function registered
54  // with this "token".
55  string token;
56
57  // The device on which Tensors are stored; only used for EagerPyFunc.
58  Device* device;
59
60  // True if and only if the op has been placed on a GPU.
61  bool gpu;
62
63  // True if the call is associated with an EagerPyFunc.
64  bool eager;
65
66  // Inputs and outputs of this function invocation.
67  std::vector<Tensor> ins;
68  std::vector<Tensor> out;
69};
70
71// Givens the 'call', prepares the token and inputs as a python tuple
72// that is appropriate for calling the trampoline.
73Status MakeArgTuple(const PyCall* call, PyObject** tuple) {
74  int64 n = call->ins.size();
75  PyObject* lst = PyList_New(n);
76  CHECK(lst);
77  for (int64 i = 0; i < n; ++i) {
78    PyObject* arg = nullptr;
79    const Tensor& t = call->ins[i];
80    if (call->eager) {
81      if (call->gpu) {
82        arg = EagerTensorFromHandle(new TFE_TensorHandle(t, call->device));
83      } else {
84        // TFE_TensorHandle assumes that CPU is identified by `nullptr`.
85        arg = EagerTensorFromHandle(new TFE_TensorHandle(t, nullptr));
86      }
87      if (arg == nullptr) {
88        return errors::Internal("Unable to procure EagerTensor from Tensor.");
89      }
90    } else {
91      Status s = ConvertTensorToNdarray(t, &arg);
92      if (!s.ok()) {
93        Py_DECREF(lst);
94        return s;
95      }
96    }
97    PyList_SetItem(lst, i, arg);
98  }
99  *tuple = Py_BuildValue("(sON)", call->token.c_str(),
100                         call->gpu ? Py_True : Py_False, lst);
101  CHECK(*tuple);
102  return Status::OK();
103}
104
105// Returns the corresponding tf dtype in 'tf' for numpy data type
106// 'np'.  Returns an error if the type is not supported by this
107// module.
108Status NumericNpDTypeToTfDType(const int np, DataType* tf) {
109  switch (np) {
110    case NPY_FLOAT16:
111      *tf = DT_HALF;
112      break;
113    case NPY_FLOAT32:
114      *tf = DT_FLOAT;
115      break;
116    case NPY_FLOAT64:
117      *tf = DT_DOUBLE;
118      break;
119    case NPY_INT32:
120      *tf = DT_INT32;
121      break;
122    case NPY_UINT8:
123      *tf = DT_UINT8;
124      break;
125    case NPY_INT8:
126      *tf = DT_INT8;
127      break;
128    case NPY_INT16:
129      *tf = DT_INT16;
130      break;
131    case NPY_INT64:
132      *tf = DT_INT64;
133      break;
134    case NPY_BOOL:
135      *tf = DT_BOOL;
136      break;
137    case NPY_COMPLEX64:
138      *tf = DT_COMPLEX64;
139      break;
140    case NPY_COMPLEX128:
141      *tf = DT_COMPLEX128;
142      break;
143    default:
144      return errors::Unimplemented("Unsupported numpy type ", np);
145  }
146  return Status::OK();
147}
148
149bool IsSingleNone(PyObject* obj) {
150  if (!PyArray_Check(obj)) {
151    return false;
152  }
153  PyArrayObject* array_obj = reinterpret_cast<PyArrayObject*>(obj);
154  if (PyArray_NDIM(array_obj) != 0 || PyArray_SIZE(array_obj) != 1) {
155    return false;
156  }
157  std::array<npy_intp, 0> indices;
158  char* item_ptr =
159      static_cast<char*>(PyArray_GetPtr(array_obj, indices.data()));
160  PyObject* item = PyArray_GETITEM(array_obj, item_ptr);
161  CHECK(item);
162  return item == Py_None;
163}
164
165// Retrieves a Tensor from `eager_tensor` and stores it in `output_tensor`.
166void ExtractTensorFromEagerTensor(const PyObject* eager_tensor,
167                                  Tensor* output_tensor) {
168  *output_tensor = EagerTensor_Handle(eager_tensor)->t;
169}
170
171// Calls the registered py function through the trampoline.
172Status DoCallPyFunc(PyCall* call, bool* out_log_on_error) {
173  *out_log_on_error = true;
174  PyObject* trampoline = GetPyTrampoline();
175  if (trampoline == nullptr) {
176    return errors::InvalidArgument(
177        "Missing py trampoline. Most likely, it is a link error.");
178  }
179  // Prepare the argument.
180  PyObject* args = nullptr;
181  TF_RETURN_IF_ERROR(MakeArgTuple(call, &args));
182  CHECK(args);
183
184  // Invokes the trampoline.
185  PyObject* result = PyEval_CallObject(trampoline, args);
186  Py_DECREF(args);
187  if (result == nullptr) {
188    if (PyErr_Occurred()) {
189      if (PyErr_ExceptionMatches(PyExc_ValueError) ||
190          PyErr_ExceptionMatches(PyExc_TypeError)) {
191        return errors::InvalidArgument(PyExceptionFetch());
192      } else if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
193        *out_log_on_error = false;
194        return errors::OutOfRange(PyExceptionFetch());
195      } else if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
196        return errors::ResourceExhausted(PyExceptionFetch());
197      } else if (PyErr_ExceptionMatches(PyExc_NotImplementedError)) {
198        return errors::Unimplemented(PyExceptionFetch());
199      } else {
200        // TODO(ebrevdo): Check if exception is an OpError and use the
201        // OpError.error_code property to map it back in the Status.
202        return errors::Unknown(PyExceptionFetch());
203      }
204    } else {
205      return errors::Internal("Failed to run py callback ", call->token,
206                              ": see error log.");
207    }
208  }
209
210  // Process the return values and convert them to TF Tensors.
211  Status s = Status::OK();
212  if (PyList_Check(result)) {
213    // `result` is a Python list; if this operation is an `EagerPyFunc`, then
214    // every item in the list must be an `EagerTensor`; otherwise, every element
215    // must be a NumPy array.
216    call->out.clear();
217    for (int i = 0; i < PyList_Size(result); ++i) {
218      Tensor t;
219      if (call->eager) {
220        const PyObject* item = PyList_GetItem(result, i);
221        if (EagerTensor_CheckExact(item)) {
222          ExtractTensorFromEagerTensor(item, &t);
223        } else {
224          s = errors::FailedPrecondition(
225              "Expected EagerTensor, found PyObject of type: ",
226              Py_TYPE(item)->tp_name);
227        }
228      } else {
229        s = ConvertNdarrayToTensor(PyList_GetItem(result, i), &t);
230      }
231
232      if (!s.ok()) {
233        break;
234      }
235      call->out.push_back(t);
236    }
237  } else if (EagerTensor_CheckExact(result) || result == Py_None) {
238    // result is an `EagerTensor` or `None`.
239    DCHECK(call->eager);
240    Tensor t;
241    if (result != Py_None) {
242      ExtractTensorFromEagerTensor(result, &t);
243      call->out.push_back(t);
244    }
245  } else if (PyArray_Check(result)) {
246    // `result` is a NumPy array.
247    DCHECK(!call->eager);
248    if (!IsSingleNone(result)) {
249      Tensor t;
250      s = ConvertNdarrayToTensor(result, &t);
251      if (s.ok()) {
252        call->out.push_back(t);
253      }
254    }
255  } else {
256    s = errors::Internal("Unexpected PyObject was returned: ",
257                         Py_TYPE(result)->tp_name);
258  }
259  Py_DECREF(result);
260  return s;
261}
262
263}  // end namespace
264
265// Outside anonymous namespace just to make the friend declaration in
266// tensorflow::Tensor apply.
267class NumpyTensorBuffer : public TensorBuffer {
268 public:
269  NumpyTensorBuffer(PyArrayObject* array, size_t len, void* data)
270      : array_(array), len_(len), data_(data) {}
271
272  ~NumpyTensorBuffer() override {
273    // Note: The session::run wrapper is responsible for freeing this while
274    // holding the GIL.
275    DelayedNumpyDecref(data_, len_, array_);
276  }
277
278  void* data() const override { return data_; }
279  size_t size() const override { return len_; }
280  TensorBuffer* root_buffer() override { return this; }
281  void FillAllocationDescription(AllocationDescription* proto) const override {
282    tensorflow::int64 rb = size();
283    proto->set_requested_bytes(rb);
284    proto->set_allocator_name(tensorflow::cpu_allocator()->Name());
285  }
286  Tensor MakeTensor(DataType dtype, const TensorShape& shape) {
287    CHECK_EQ(len_, shape.num_elements() * DataTypeSize(dtype));
288    return Tensor(dtype, shape, this);
289  }
290
291  // Prevents input forwarding from overwriting this buffer.
292  bool OwnsMemory() const override { return false; }
293
294 private:
295  PyArrayObject* array_;
296  size_t len_;
297  void* data_;
298};
299
300Status ConvertNdarrayToTensor(PyObject* obj, Tensor* ret) {
301  PyArrayObject* input = reinterpret_cast<PyArrayObject*>(obj);
302  DataType dtype = DT_INVALID;
303  TensorShape shape;
304  for (int i = 0; i < PyArray_NDIM(input); ++i) {
305    shape.AddDim(PyArray_SHAPE(input)[i]);
306  }
307  const int np_type = PyArray_TYPE(input);
308  switch (np_type) {
309    case NPY_OBJECT: {
310      dtype = DT_STRING;
311      Tensor t(dtype, shape);
312      auto tflat = t.flat<string>();
313      PyObject** input_data = reinterpret_cast<PyObject**>(PyArray_DATA(input));
314      for (int i = 0; i < tflat.dimension(0); ++i) {
315        char* el;
316        Py_ssize_t el_size;
317        if (PyBytes_AsStringAndSize(input_data[i], &el, &el_size) == -1) {
318#if PY_MAJOR_VERSION >= 3
319          el = PyUnicode_AsUTF8AndSize(input_data[i], &el_size);
320#else
321          el = nullptr;
322          if (PyUnicode_Check(input_data[i])) {
323            PyObject* unicode = PyUnicode_AsUTF8String(input_data[i]);
324            if (unicode) {
325              if (PyString_AsStringAndSize(unicode, &el, &el_size) == -1) {
326                Py_DECREF(unicode);
327                el = nullptr;
328              }
329            }
330          }
331#endif
332          if (!el) {
333            return errors::Unimplemented("Unsupported object type ",
334                                         input_data[i]->ob_type->tp_name);
335          }
336        }
337        tflat(i) = string(el, el_size);
338      }
339      *ret = t;
340      break;
341    }
342    case NPY_STRING: {
343      dtype = DT_STRING;
344      Tensor t(dtype, shape);
345      auto tflat = t.flat<string>();
346      char* input_data = PyArray_BYTES(input);
347      Py_ssize_t el_size = PyArray_ITEMSIZE(input);
348      for (int i = 0; i < tflat.dimension(0); ++i) {
349        tflat(i) = string(input_data + i * el_size, el_size);
350      }
351      *ret = t;
352      break;
353    }
354    default: {
355      TF_RETURN_IF_ERROR(NumericNpDTypeToTfDType(PyArray_TYPE(input), &dtype));
356      CHECK(DataTypeCanUseMemcpy(dtype));
357      if (reinterpret_cast<intptr_t>(PyArray_DATA(input)) %
358              EIGEN_MAX_ALIGN_BYTES !=
359          0) {
360        Tensor t(dtype, shape);
361        StringPiece p = t.tensor_data();
362        memcpy(const_cast<char*>(p.data()), PyArray_DATA(input), p.size());
363        *ret = t;
364      } else {
365        // Incref the array as the calling context will decref it when we
366        // return and we want to keep a handle to this memory.
367        Py_INCREF(input);
368        NumpyTensorBuffer* buf = new NumpyTensorBuffer(
369            input, shape.num_elements() * DataTypeSize(dtype),
370            PyArray_DATA(input));
371        *ret = buf->MakeTensor(dtype, shape);
372        buf->Unref();
373      }
374    }
375  }
376  return Status::OK();
377}
378
379// Creates a numpy array in 'ret' which either aliases the content of 't' or has
380// a copy.
381Status ConvertTensorToNdarray(const Tensor& t, PyObject** ret) {
382  int typenum = -1;
383  TF_RETURN_IF_ERROR(TF_DataType_to_PyArray_TYPE(
384      static_cast<TF_DataType>(t.dtype()), &typenum));
385  PyArray_Descr* descr = PyArray_DescrFromType(typenum);
386  CHECK(descr);
387  std::vector<npy_intp> dims;
388  dims.reserve(t.dims());
389  for (int i = 0; i < t.dims(); ++i) {
390    dims.push_back(t.dim_size(i));
391  }
392  Tensor* copy = new Tensor(t);
393  if (ArrayFromMemory(dims.size(), dims.data(),
394                      const_cast<char*>(copy->tensor_data().data()), t.dtype(),
395                      [copy]() { delete copy; }, ret)
396          .ok()) {
397    return Status::OK();
398  }
399  delete copy;
400
401  PyObject* obj = PyArray_Empty(dims.size(), dims.data(), descr, 0);
402  if (obj == nullptr) {
403    return errors::Internal("Failed to allocate np array: ",
404                            t.shape().DebugString());
405  }
406  PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(obj);
407  if (typenum == NPY_OBJECT) {
408    CHECK_EQ(DT_STRING, t.dtype());
409    auto tflat = t.flat<string>();
410    PyObject** out = reinterpret_cast<PyObject**>(PyArray_DATA(np_array));
411    for (int i = 0; i < tflat.dimension(0); ++i) {
412      const string& el = tflat(i);
413      out[i] = PyBytes_FromStringAndSize(el.data(), el.size());
414      if (out[i] == nullptr) {
415        for (int j = 0; j < i; ++j) {
416          Py_DECREF(out[j]);
417        }
418        Py_DECREF(obj);
419        return errors::Internal("Failed to allocate a copy of string ", i);
420      }
421    }
422  } else {
423    CHECK(DataTypeCanUseMemcpy(t.dtype()));
424    StringPiece p = t.tensor_data();
425    memcpy(PyArray_DATA(np_array), p.data(), p.size());
426  }
427  *ret = PyArray_Return(np_array);
428  return Status::OK();
429}
430
431void InitializePyTrampoline(PyObject* trampoline) {
432  mutex_lock l(mu);
433  if (py_trampoline == nullptr) {
434    py_trampoline = trampoline;
435    Py_INCREF(py_trampoline);
436  } else {
437    LOG(WARNING) << "InitializeCallback should only be called once";
438  }
439}
440
441class PyFuncOp : public OpKernel {
442 public:
443  explicit PyFuncOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
444    OP_REQUIRES_OK(ctx, ctx->GetAttr("token", &token_));
445    eager_ = type_string() == "EagerPyFunc";
446    gpu_ = ctx->device_type().type_string() == DEVICE_GPU;
447  }
448
449  void Compute(OpKernelContext* ctx) override {
450    PyCall call;
451    call.token = token_;
452    call.gpu = gpu_;
453    call.eager = eager_;
454    if (call.eager) {
455      // Eager's C API uses `Device`, whereas `OpKernelContext` stores a
456      // `DeviceBase`; attempt to downcast.
457      call.device = dynamic_cast<Device*>(ctx->device());
458      if (call.device == nullptr) {
459        ctx->CtxFailureWithWarning(
460            errors::Internal("Unrecognized device class"));
461      }
462    }
463
464    for (int i = 0; i < ctx->num_inputs(); ++i) {
465      call.ins.push_back(ctx->input(i));
466    }
467
468    PyGILState_STATE py_threadstate;
469    py_threadstate = PyGILState_Ensure();
470    bool log_on_error;
471    Status s = DoCallPyFunc(&call, &log_on_error);
472    // Sometimes py_funcs can be called without a session and leak memory. This
473    // ensures we clear the decref cache so this doesn't happen.
474    ClearDecrefCache();
475    PyGILState_Release(py_threadstate);
476
477    // Ensures that GIL is released even when !s.ok().
478    if (!s.ok()) {
479      if (log_on_error) {
480        ctx->CtxFailureWithWarning(s);
481      } else {
482        ctx->CtxFailure(s);
483      }
484      return;
485    }
486
487    OP_REQUIRES(ctx, static_cast<int32>(call.out.size()) == ctx->num_outputs(),
488                errors::InvalidArgument(token_, " returns ", call.out.size(),
489                                        " values, but expects to see ",
490                                        ctx->num_outputs(), " values."));
491    for (size_t i = 0; i < call.out.size(); ++i) {
492      const auto& t = call.out[i];
493      OP_REQUIRES(
494          ctx, t.dtype() == output_type(i),
495          errors::InvalidArgument(i, "-th value returned by ", token_, " is ",
496                                  DataTypeString(t.dtype()), ", but expects ",
497                                  DataTypeString(output_type(i))));
498      ctx->set_output(i, t);
499    }
500  }
501
502 private:
503  string token_;
504
505  // True if and only if this op has been placed on a GPU.
506  bool gpu_;
507
508  // True if and only if this op should execute the python function eagerly,
509  // i.e., if and only if the eager attribute is set.
510  bool eager_;
511
512  TF_DISALLOW_COPY_AND_ASSIGN(PyFuncOp);
513};
514
515REGISTER_KERNEL_BUILDER(Name("PyFunc").Device(DEVICE_CPU), PyFuncOp);
516REGISTER_KERNEL_BUILDER(Name("PyFuncStateless").Device(DEVICE_CPU), PyFuncOp);
517REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_CPU), PyFuncOp);
518REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_GPU), PyFuncOp);
519
520}  // end namespace tensorflow
521