From 914cdc2a02721defe4c947bb02ff8b298f2bffa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Mon, 2 Mar 2026 21:59:24 +0100 Subject: [PATCH 1/3] Implement remote evaluation and distributed garbage collection --- c_src/python.cpp | 2 + c_src/python.hpp | 1 + c_src/pythonx.cpp | 275 ++++++++++++++++++++++++---------- lib/pythonx.ex | 212 ++++++++++++++++++++++++-- lib/pythonx/application.ex | 3 +- lib/pythonx/encoder.ex | 8 + lib/pythonx/error.ex | 12 +- lib/pythonx/nif.ex | 6 +- lib/pythonx/object.ex | 26 +++- lib/pythonx/object_tracker.ex | 204 +++++++++++++++++++++++++ mix.exs | 1 + mix.lock | 1 + test/pythonx_test.exs | 230 ++++++++++++++++++++++++++++ test/test_helper.exs | 40 ++++- 14 files changed, 912 insertions(+), 109 deletions(-) create mode 100644 lib/pythonx/object_tracker.ex diff --git a/c_src/python.cpp b/c_src/python.cpp index 70be734..e30139c 100644 --- a/c_src/python.cpp +++ b/c_src/python.cpp @@ -27,6 +27,7 @@ DEF_SYMBOL(PyDict_Next) DEF_SYMBOL(PyDict_SetItem) DEF_SYMBOL(PyDict_SetItemString) DEF_SYMBOL(PyDict_Size) +DEF_SYMBOL(PyErr_Clear) DEF_SYMBOL(PyErr_Fetch) DEF_SYMBOL(PyErr_Occurred) DEF_SYMBOL(PyEval_GetBuiltins) @@ -101,6 +102,7 @@ void load_python_library(std::string path) { LOAD_SYMBOL(python_library, PyDict_SetItem) LOAD_SYMBOL(python_library, PyDict_SetItemString) LOAD_SYMBOL(python_library, PyDict_Size) + LOAD_SYMBOL(python_library, PyErr_Clear) LOAD_SYMBOL(python_library, PyErr_Fetch) LOAD_SYMBOL(python_library, PyErr_Occurred) LOAD_SYMBOL(python_library, PyEval_GetBuiltins) diff --git a/c_src/python.hpp b/c_src/python.hpp index 3bbbdca..70b5d95 100644 --- a/c_src/python.hpp +++ b/c_src/python.hpp @@ -81,6 +81,7 @@ extern int (*PyDict_Next)(PyObjectPtr, Py_ssize_t *, PyObjectPtr *, extern int (*PyDict_SetItem)(PyObjectPtr, PyObjectPtr, PyObjectPtr); extern int (*PyDict_SetItemString)(PyObjectPtr, const char *, PyObjectPtr); extern Py_ssize_t (*PyDict_Size)(PyObjectPtr); +extern void (*PyErr_Clear)(); extern void (*PyErr_Fetch)(PyObjectPtr *, PyObjectPtr *, PyObjectPtr *); extern PyObjectPtr (*PyErr_Occurred)(); extern PyObjectPtr (*PyEval_GetBuiltins)(); diff --git a/c_src/pythonx.cpp b/c_src/pythonx.cpp index 976b051..07d41af 100644 --- a/c_src/pythonx.cpp +++ b/c_src/pythonx.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "python.hpp" @@ -138,15 +139,14 @@ auto ElixirPythonxJanitor = fine::Atom("Elixir.Pythonx.Janitor"); auto ElixirPythonxObject = fine::Atom("Elixir.Pythonx.Object"); auto decref = fine::Atom("decref"); auto integer = fine::Atom("integer"); +auto lines = fine::Atom("lines"); auto list = fine::Atom("list"); auto map = fine::Atom("map"); auto map_set = fine::Atom("map_set"); auto output = fine::Atom("output"); +auto remote_info = fine::Atom("remote_info"); auto resource = fine::Atom("resource"); -auto traceback = fine::Atom("traceback"); auto tuple = fine::Atom("tuple"); -auto type = fine::Atom("type"); -auto value = fine::Atom("value"); } // namespace atoms struct PyObjectResource { @@ -186,8 +186,26 @@ struct PyObjectResource { FINE_RESOURCE(PyObjectResource); +// A resource that notifies the given process upon garbage collection. +struct GCNotifier { + ErlNifPid pid; + ErlNifEnv *message_env; + ERL_NIF_TERM message_term; + + GCNotifier(ErlNifPid pid, ErlNifEnv *message_env, ERL_NIF_TERM message_term) + : pid(pid), message_env(message_env), message_term(message_term) {} + + void destructor(ErlNifEnv *env) { + enif_send(env, &pid, message_env, message_term); + enif_free_env(message_env); + } +}; + +FINE_RESOURCE(GCNotifier); + struct ExObject { fine::ResourcePtr resource; + std::optional remote_info; ExObject() {} ExObject(fine::ResourcePtr resource) : resource(resource) {} @@ -196,26 +214,21 @@ struct ExObject { static constexpr auto fields() { return std::make_tuple( - std::make_tuple(&ExObject::resource, &atoms::resource)); + std::make_tuple(&ExObject::resource, &atoms::resource), + std::make_tuple(&ExObject::remote_info, &atoms::remote_info)); } }; struct ExError { - ExObject type; - ExObject value; - ExObject traceback; + std::vector lines; ExError() {} - ExError(ExObject type, ExObject value, ExObject traceback) - : type(type), value(value), traceback(traceback) {} + ExError(std::vector lines) : lines(lines) {} static constexpr auto module = &atoms::ElixirPythonxError; static constexpr auto fields() { - return std::make_tuple( - std::make_tuple(&ExError::type, &atoms::type), - std::make_tuple(&ExError::value, &atoms::value), - std::make_tuple(&ExError::traceback, &atoms::traceback)); + return std::make_tuple(std::make_tuple(&ExError::lines, &atoms::lines)); } static constexpr auto is_exception = true; @@ -228,30 +241,91 @@ struct EvalInfo { std::thread::id thread_id; }; -void raise_py_error(ErlNifEnv *env) { +void raise_formatting_error_if_failed(PyObjectPtr py_object) { + if (py_object == NULL) { + throw std::runtime_error("failed while formatting a python error"); + } +} + +void raise_formatting_error_if_failed(const char *buffer) { + if (buffer == NULL) { + throw std::runtime_error("failed while formatting a python error"); + } +} + +void raise_formatting_error_if_failed(Py_ssize_t size) { + if (size == -1) { + throw std::runtime_error("failed while formatting a python error"); + } +} + +ExError build_py_error_from_current(ErlNifEnv *env) { PyObjectPtr py_type, py_value, py_traceback; PyErr_Fetch(&py_type, &py_value, &py_traceback); // If the error indicator was set, type should not be NULL, but value - // and traceback might - + // and traceback might. if (py_type == NULL) { - throw std::runtime_error( - "raise_py_error should only be called when the error indicator is set"); + throw std::runtime_error("build_py_error_from_current should only be " + "called when the error indicator is set"); } auto type = ExObject(fine::make_resource(py_type)); - // Default value and traceback to None object + // Default value and traceback to None object. + py_value = py_value == NULL ? Py_BuildValue("") : py_value; + py_traceback = py_traceback == NULL ? Py_BuildValue("") : py_traceback; + + // Format the exception. Note that if anything raises an error here, + // we throw a runtime exception, instead of a Python one, otherwise + // we could go into an infinite loop. + + auto py_traceback_module = PyImport_ImportModule("traceback"); + raise_formatting_error_if_failed(py_traceback_module); + auto py_traceback_module_guard = PyDecRefGuard(py_traceback_module); - auto value = fine::make_resource( - py_value == NULL ? Py_BuildValue("") : py_value); + auto format_exception = + PyObject_GetAttrString(py_traceback_module, "format_exception"); + raise_formatting_error_if_failed(format_exception); + auto format_exception_guard = PyDecRefGuard(format_exception); - auto traceback = fine::make_resource( - py_traceback == NULL ? Py_BuildValue("") : py_traceback); + auto format_exception_args = PyTuple_Pack(3, py_type, py_value, py_traceback); + raise_formatting_error_if_failed(format_exception_args); + auto format_exception_args_guard = PyDecRefGuard(format_exception_args); - auto error = ExError(type, value, traceback); - fine::raise(env, error); + auto py_lines = PyObject_Call(format_exception, format_exception_args, NULL); + raise_formatting_error_if_failed(py_lines); + auto py_lines_guard = PyDecRefGuard(py_lines); + + auto size = PyList_Size(py_lines); + raise_formatting_error_if_failed(size); + + auto terms = std::vector(); + terms.reserve(size); + + for (Py_ssize_t i = 0; i < size; i++) { + auto py_line = PyList_GetItem(py_lines, i); + raise_formatting_error_if_failed(py_line); + + Py_ssize_t size; + auto buffer = PyUnicode_AsUTF8AndSize(py_line, &size); + raise_formatting_error_if_failed(buffer); + + // The buffer is immutable and lives as long as the Python object, + // so we create the term as a resource binary to make it zero-copy. + Py_IncRef(py_line); + auto ex_object_resource = fine::make_resource(py_line); + auto binary_term = + fine::make_resource_binary(env, ex_object_resource, buffer, size); + + terms.push_back(binary_term); + } + + return ExError(std::move(terms)); +} + +void raise_py_error(ErlNifEnv *env) { + fine::raise(env, build_py_error_from_current(env)); } void raise_if_failed(ErlNifEnv *env, PyObjectPtr py_object) { @@ -284,6 +358,19 @@ ERL_NIF_TERM py_str_to_binary_term(ErlNifEnv *env, PyObjectPtr py_object) { return fine::make_resource_binary(env, ex_object_resource, buffer, size); } +ERL_NIF_TERM py_bytes_to_binary_term(ErlNifEnv *env, PyObjectPtr py_object) { + Py_ssize_t size; + char *buffer; + auto result = PyBytes_AsStringAndSize(py_object, &buffer, &size); + raise_if_failed(env, result); + + // The buffer is immutable and lives as long as the Python object, + // so we create the term as a resource binary to make it zero-copy. + Py_IncRef(py_object); + auto ex_object_resource = fine::make_resource(py_object); + return fine::make_resource_binary(env, ex_object_resource, buffer, size); +} + fine::Ok<> init(ErlNifEnv *env, std::string python_dl_path, ErlNifBinary python_home_path, ErlNifBinary python_executable_path, @@ -785,50 +872,6 @@ ExObject object_repr(ErlNifEnv *env, ExObject ex_object) { FINE_NIF(object_repr, ERL_NIF_DIRTY_JOB_CPU_BOUND); -fine::Term format_exception(ErlNifEnv *env, ExError error) { - ensure_initialized(); - auto gil_guard = PyGILGuard(); - - auto py_traceback_module = PyImport_ImportModule("traceback"); - raise_if_failed(env, py_traceback_module); - auto py_traceback_module_guard = PyDecRefGuard(py_traceback_module); - - auto format_exception = - PyObject_GetAttrString(py_traceback_module, "format_exception"); - raise_if_failed(env, format_exception); - auto format_exception_guard = PyDecRefGuard(format_exception); - - auto py_type = error.type.resource->py_object; - auto py_value = error.value.resource->py_object; - auto py_traceback = error.traceback.resource->py_object; - - auto format_exception_args = PyTuple_Pack(3, py_type, py_value, py_traceback); - raise_if_failed(env, format_exception_args); - auto format_exception_args_guard = PyDecRefGuard(format_exception_args); - - auto py_lines = PyObject_Call(format_exception, format_exception_args, NULL); - raise_if_failed(env, py_lines); - auto py_lines_guard = PyDecRefGuard(py_lines); - - auto size = PyList_Size(py_lines); - raise_if_failed(env, size); - - auto terms = std::vector(); - terms.reserve(size); - - for (Py_ssize_t i = 0; i < size; i++) { - auto py_line = PyList_GetItem(py_lines, i); - raise_if_failed(env, py_line); - - terms.push_back(py_str_to_binary_term(env, py_line)); - } - - return enif_make_list_from_array(env, terms.data(), - static_cast(size)); -} - -FINE_NIF(format_exception, ERL_NIF_DIRTY_JOB_CPU_BOUND); - fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) { ensure_initialized(); auto gil_guard = PyGILGuard(); @@ -987,16 +1030,7 @@ fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) { auto is_bytes = PyObject_IsInstance(py_object, py_bytes_type); raise_if_failed(env, is_bytes); if (is_bytes) { - Py_ssize_t size; - char *buffer; - auto result = PyBytes_AsStringAndSize(py_object, &buffer, &size); - raise_if_failed(env, result); - - // The buffer is immutable and lives as long as the Python object, - // so we create the term as a resource binary to make it zero-copy. - Py_IncRef(py_object); - auto ex_object_resource = fine::make_resource(py_object); - return fine::make_resource_binary(env, ex_object_resource, buffer, size); + return py_bytes_to_binary_term(env, py_object); } auto py_set_type = PyDict_GetItemString(py_builtins, "set"); @@ -1461,6 +1495,86 @@ eval(ErlNifEnv *env, ErlNifBinary code, std::string code_md5, FINE_NIF(eval, ERL_NIF_DIRTY_JOB_CPU_BOUND); +std::variant, fine::Error> +dump_object(ErlNifEnv *env, ExObject ex_object) { + ensure_initialized(); + auto gil_guard = PyGILGuard(); + + std::string pickle_module_name; + PyObjectPtr py_pickle; + + py_pickle = PyImport_ImportModule("cloudpickle"); + if (py_pickle != NULL) { + pickle_module_name = "cloudpickle"; + } else { + // If importing fails, we ignore the error and fallback to the pickle + // module. + PyErr_Clear(); + py_pickle = PyImport_ImportModule("pickle"); + raise_if_failed(env, py_pickle); + pickle_module_name = "pickle"; + } + auto py_pickle_guard = PyDecRefGuard(py_pickle); + + auto py_dumps = PyObject_GetAttrString(py_pickle, "dumps"); + raise_if_failed(env, py_dumps); + auto py_dumps_guard = PyDecRefGuard(py_dumps); + + auto py_dumps_args = PyTuple_Pack(1, ex_object.resource->py_object); + raise_if_failed(env, py_dumps_args); + auto py_dumps_args_guard = PyDecRefGuard(py_dumps_args); + + auto py_dump_bytes = PyObject_Call(py_dumps, py_dumps_args, NULL); + if (py_dump_bytes == NULL) { + return fine::Error(pickle_module_name, + build_py_error_from_current(env)); + } + raise_if_failed(env, py_dump_bytes); + auto py_bytes_guard = PyDecRefGuard(py_dump_bytes); + + return fine::Ok(py_bytes_to_binary_term(env, py_dump_bytes)); +} + +FINE_NIF(dump_object, ERL_NIF_DIRTY_JOB_CPU_BOUND); + +ExObject load_object(ErlNifEnv *env, ErlNifBinary binary) { + ensure_initialized(); + auto gil_guard = PyGILGuard(); + + auto py_pickle = PyImport_ImportModule("pickle"); + raise_if_failed(env, py_pickle); + auto py_pickle_guard = PyDecRefGuard(py_pickle); + + auto py_loads = PyObject_GetAttrString(py_pickle, "loads"); + raise_if_failed(env, py_loads); + auto py_loads_guard = PyDecRefGuard(py_loads); + + auto py_bytes = PyBytes_FromStringAndSize( + reinterpret_cast(binary.data), binary.size); + raise_if_failed(env, py_bytes); + auto py_bytes_guard = PyDecRefGuard(py_bytes); + + auto py_loads_args = PyTuple_Pack(1, py_bytes); + raise_if_failed(env, py_loads_args); + auto py_loads_args_guard = PyDecRefGuard(py_loads_args); + + auto py_object = PyObject_Call(py_loads, py_loads_args, NULL); + raise_if_failed(env, py_object); + + return ExObject(fine::make_resource(py_object)); +} + +FINE_NIF(load_object, ERL_NIF_DIRTY_JOB_CPU_BOUND); + +fine::ResourcePtr create_gc_notifier(ErlNifEnv *env, ErlNifPid pid, + fine::Term term) { + auto message_env = enif_alloc_env(); + auto message_term = enif_make_copy(message_env, term); + return fine::make_resource(pid, message_env, message_term); +} + +FINE_NIF(create_gc_notifier, 0); + } // namespace pythonx FINE_INIT("Elixir.Pythonx.NIF"); @@ -1505,6 +1619,9 @@ extern "C" void pythonx_handle_io_write(const char *message, ErlNifPid janitor_pid; if (enif_whereis_pid(caller_env, janitor_name, &janitor_pid)) { auto device = type == 0 ? eval_info.stdout_device : eval_info.stderr_device; + // Copy the device term is from a differnet env, so we copy it into + // the message env, otherwise we may run into unexpected behaviour. + device = enif_make_copy(env, device); auto msg = fine::encode(env, std::make_tuple(pythonx::atoms::output, std::string(message), device)); diff --git a/lib/pythonx.ex b/lib/pythonx.ex index 6c0afeb..4f0b7ee 100644 --- a/lib/pythonx.ex +++ b/lib/pythonx.ex @@ -325,6 +325,15 @@ defmodule Pythonx do [1] > + ### Remote execution + + If you want to evaluate code on a remote node and get `Pythonx.Object` + back, use `remote_eval/4` instead of `eval/3` to ensure proper + lifetime of the Python objects. + + Pythonx also integrates with `FLAME`. When you call `eval/3` on a + `FLAME` runner, enable the `:track_resources` option, so that the + objects are properly tracked. ''' @spec eval(String.t(), %{optional(String.t()) => term()}, keyword()) :: {Object.t() | nil, %{optional(String.t()) => Object.t()}} @@ -336,23 +345,36 @@ defmodule Pythonx do end opts = Keyword.validate!(opts, [:stdout_device, :stderr_device]) + validate_globals!(globals) globals = for {key, value} <- globals do - if not is_binary(key) do - raise ArgumentError, "expected globals keys to be strings, got: #{inspect(key)}" - end - {key, encode!(value)} end - code_md5 = :erlang.md5(code) - stdout_device = Keyword.get_lazy(opts, :stdout_device, fn -> Process.group_leader() end) stderr_device = Keyword.get_lazy(opts, :stderr_device, fn -> Process.whereis(:standard_error) end) + do_eval(code, globals, stdout_device, stderr_device) + end + + defp pythonx_started?() do + Process.whereis(Pythonx.Supervisor) != nil + end + + defp validate_globals!(globals) do + for {key, _value} <- globals do + if not is_binary(key) do + raise ArgumentError, "expected globals keys to be strings, got: #{inspect(key)}" + end + end + end + + defp do_eval(code, globals, stdout_device, stderr_device) do + code_md5 = :erlang.md5(code) + result = Pythonx.NIF.eval(code, code_md5, globals, stdout_device, stderr_device) # Wait for the janitor to process all output messages received @@ -363,10 +385,6 @@ defmodule Pythonx do result end - defp pythonx_started?() do - Process.whereis(Pythonx.Supervisor) != nil - end - @doc ~S''' Convenience macro for Python code evaluation. @@ -553,4 +571,178 @@ defmodule Pythonx do "Note that Pythonx.eval/2 or the ~PY sigil result in nil, if the " <> "evaluated code ends with a statement, rather than expression" end + + @doc """ + Creates a local copy of a remote `Pythonx.Object`. + + Remote objects can be returned when using `remote_eval/4` or + evaluating on a `FLAME` runner. This function makes a local copy + of such objects, so that they can be passed to local `eval/3`, + if desired. + + If a local object is given, it is returned as is. + + ### Pickling + + The object are copied across nodes in a serialized format provided + by the Python's build-in [`pickle`](https://docs.python.org/3/library/pickle.html) + module. While `pickle` supports basic Python types, and libraries + implement custom pickling logic, certain Python values cannot be + pickled by default (for example, local functions and lambdas). + If you run into this limitation, you can add the `cloudpickle` + package for extended pickling support: + + ```text + cloudpickle==3.1.2 + ``` + """ + @spec copy_remote_object(Pythonx.Object.t()) :: Pythonx.Object.t() + def copy_remote_object(%Pythonx.Object{} = object) when node(object.resource) == node() do + object + end + + def copy_remote_object(%Pythonx.Object{} = object) do + node = node(object.resource) + + case :erpc.call(node, __MODULE__, :__dump__, [object]) do + {:ok, binary} -> + Pythonx.NIF.load_object(binary) + + {:error, "pickle", %Pythonx.Error{} = error} -> + raise ArgumentError, """ + failed to serialize the given object using the built-in pickle module. The pickle module does not support all object types, for extended pickling support add the following package: + + cloudpickle==3.1.2 + + Original error: #{Exception.message(error)} + """ + + {:error, module, %Pythonx.Error{} = error} -> + raise RuntimeError, """ + failed to serialize the given object using the #{module} module. + + Original error: #{Exception.message(error)} + """ + + {:exception, exception} -> + raise exception + end + end + + @doc false + def __dump__(object) do + try do + Pythonx.NIF.dump_object(object) + rescue + error -> {:exception, error} + end + end + + @doc """ + Evaluates the Python `code` on `node`. + + Any local Pythonx objects passed in `globals` will automatically be + copied into the remote node. The returned result and globals are + remote Pythonx objects, see the note below. + + For more details and options, see `eval/3`. + + > #### Remote Pythonx objects {: .warning} + > + > The result and globals returned by `remote_eval/4` are `Pythonx.Object` + > structs, however those point to Python objects allocated on the + > remote node, where the evaluation run. It is guaranteed that those + > Python objects are kept alive, as long as you keep a reference to + > the local `Pythonx.Object` structs. + > + > Avoid sending `Pythonx.Object` structs across nodes via regular + > messages or `:erpc.call/4` invocations. When doing so, there is + > no guarantee that the corresponding Python objects are kept around. + > + > Also note that calling `eval/3` does not allow remote objects to + > be passed in globals. If you want to do that, you need to explicitly + > call `copy_remote_object/1` first to get a local copy of the Python + > object. + """ + @spec remote_eval(node(), String.t(), %{optional(String.t()) => term()}, keyword()) :: + {Object.t() | nil, %{optional(String.t()) => Object.t()}} + def remote_eval(node, code, globals, opts \\ []) do + opts = Keyword.validate!(opts, [:stdout_device, :stderr_device]) + validate_globals!(globals) + + stdout_device = Keyword.get_lazy(opts, :stdout_device, fn -> Process.group_leader() end) + + stderr_device = + Keyword.get_lazy(opts, :stderr_device, fn -> Process.whereis(:standard_error) end) + + message_ref = :erlang.make_ref() + remote_args = [self(), message_ref, code, globals, stdout_device, stderr_device] + child = Node.spawn(node, __MODULE__, :__remote_eval__, remote_args) + monitor_ref = Process.monitor(child) + + receive do + {^message_ref, {:ok, {result, globals}}} -> + Process.demonitor(monitor_ref, [:flush]) + result = track_object(result) + globals = Map.new(globals, fn {key, object} -> {key, track_object(object)} end) + send(child, {message_ref, :ok}) + {result, globals} + + {^message_ref, {:exception, error}} -> + Process.demonitor(monitor_ref, [:flush]) + send(child, {message_ref, :ok}) + raise error + + {:DOWN, ^monitor_ref, :process, _pid, reason} -> + exit(reason) + end + end + + @doc false + def __remote_eval__(parent, message_ref, code, globals, stdout_device, stderr_device) do + monitor_ref = Process.monitor(parent) + + result = + try do + globals = + for {key, value} <- globals do + {key, encode!(value, &encode_with_copy_remote/2)} + end + + result = do_eval(code, globals, stdout_device, stderr_device) + + {:ok, result} + rescue + error -> {:exception, error} + end + + send(parent, {message_ref, result}) + + receive do + {^message_ref, :ok} -> + Process.demonitor(monitor_ref, [:flush]) + # Call a remote function to prevent garbage collection from + # happening until the caller tracks the pythonx objects. + Pythonx.ObjectTracker.identity(result) + + {:DOWN, ^monitor_ref, :process, _pid, reason} -> + exit(reason) + end + end + + defp encode_with_copy_remote(%Pythonx.Object{} = object, encoder) + when node(object.resource) != node() do + object + |> copy_remote_object() + |> encoder.(encoder) + end + + defp encode_with_copy_remote(value, encoder), do: Pythonx.Encoder.encode(value, encoder) + + defp track_object(object) do + case Pythonx.ObjectTracker.track_remote_object(object) do + {:noop, object} -> object + {:ok, object, _marker_pid} -> object + end + end end diff --git a/lib/pythonx/application.ex b/lib/pythonx/application.ex index f950ba5..eb7b1da 100644 --- a/lib/pythonx/application.ex +++ b/lib/pythonx/application.ex @@ -8,7 +8,8 @@ defmodule Pythonx.Application do enable_sigchld() children = [ - Pythonx.Janitor + Pythonx.Janitor, + Pythonx.ObjectTracker ] opts = [strategy: :one_for_one, name: Pythonx.Supervisor] diff --git a/lib/pythonx/encoder.ex b/lib/pythonx/encoder.ex index 919f84f..6697613 100644 --- a/lib/pythonx/encoder.ex +++ b/lib/pythonx/encoder.ex @@ -78,6 +78,14 @@ defprotocol Pythonx.Encoder do end defimpl Pythonx.Encoder, for: Pythonx.Object do + def encode(object, _encoder) when node(object.resource) != node() do + raise Protocol.UndefinedError, + protocol: @protocol, + value: object, + description: + "remote objects cannot be encoded implicitly, call Pythonx.copy_remote_object/1 first to get a local copy of the object" + end + def encode(object, _encoder) do object end diff --git a/lib/pythonx/error.ex b/lib/pythonx/error.ex index 4732e60..f39adce 100644 --- a/lib/pythonx/error.ex +++ b/lib/pythonx/error.ex @@ -3,20 +3,14 @@ defmodule Pythonx.Error do An exception raised when Python raises an exception. """ - defexception [:type, :value, :traceback] + defexception [:lines] - @type t :: %{ - type: Pythonx.Object.t(), - value: Pythonx.Object.t(), - traceback: Pythonx.Object.t() - } + @type t :: %__MODULE__{lines: [String.t()]} @impl true def message(error) do - lines = Pythonx.NIF.format_exception(error) - lines = - Enum.map(lines, fn line -> + Enum.map(error.lines, fn line -> [" ", line] end) diff --git a/lib/pythonx/nif.ex b/lib/pythonx/nif.ex index 6cf1e4e..405f1fd 100644 --- a/lib/pythonx/nif.ex +++ b/lib/pythonx/nif.ex @@ -33,9 +33,13 @@ defmodule Pythonx.NIF do def set_add(_object, _key), do: err!() def pid_new(_pid), do: err!() def object_repr(_object), do: err!() - def format_exception(_error), do: err!() def decode_once(_object), do: err!() def eval(_code, _code_md5, _globals, _stdout_device, _stderr_device), do: err!() + def dump_object(_object), do: err!() + def load_object(_object), do: err!() + + def create_gc_notifier(_pid, _message), do: err!() + defp err!(), do: :erlang.nif_error(:not_loaded) end diff --git a/lib/pythonx/object.ex b/lib/pythonx/object.ex index 2573264..fe2daec 100644 --- a/lib/pythonx/object.ex +++ b/lib/pythonx/object.ex @@ -6,9 +6,9 @@ defmodule Pythonx.Object do Elixir code. """ - defstruct [:resource] + defstruct [:resource, :remote_info] - @type t :: %__MODULE__{resource: reference()} + @type t :: %__MODULE__{} end defimpl Inspect, for: Pythonx.Object do @@ -17,10 +17,16 @@ defimpl Inspect, for: Pythonx.Object do alias Pythonx.Object def inspect(%Object{} = object, _opts) do - repr_string = - object - |> Pythonx.NIF.object_repr() - |> Pythonx.NIF.unicode_to_string() + object_node = node(object.resource) + + remote = + if object_node != node() do + concat([line(), "[", "node: ", Atom.to_string(object_node), "]"]) + else + empty() + end + + repr_string = :erpc.call(object_node, __MODULE__, :__repr_string__, [object]) repr_lines = String.split(repr_string, "\n") inner = Enum.map_intersperse(repr_lines, line(), &string/1) @@ -28,10 +34,16 @@ defimpl Inspect, for: Pythonx.Object do force_unfit( concat([ "#Pythonx.Object<", - nest(concat([line() | inner]), 2), + nest(concat([remote, line() | inner]), 2), line(), ">" ]) ) end + + def __repr_string__(%Object{} = object) do + object + |> Pythonx.NIF.object_repr() + |> Pythonx.NIF.unicode_to_string() + end end diff --git a/lib/pythonx/object_tracker.ex b/lib/pythonx/object_tracker.ex new file mode 100644 index 0000000..f423417 --- /dev/null +++ b/lib/pythonx/object_tracker.ex @@ -0,0 +1,204 @@ +defmodule Pythonx.ObjectTracker do + @moduledoc false + + # Tracks objects that other nodes point to, effectively implementing + # distributed garbage collection. + # + # Each Pythonx object uses a NIF resource, which resides in the + # memory of the node where it was created (owner). Such object can + # be sent to another node (peer), however once the owner node has + # no more references to the resource, it gets garbage collected. If + # the peer node does another RPC to operate on the object, it will + # fail, as the resource no longer exists. To address this, the owner + # node needs to keep the resource reference around, as long as any + # peer node points to it. This is precisely what this process does. + # + # We don't know when an object is sent to a peer node, so the peer + # node needs to explicitly request for the object to be tracked by + # calling `track_remote_object/1`. Tracking an object has two main + # parts: + # + # 1. The resource reference is stored in the ObjectTracker + # process on the owner node, preventing the object form being + # garbage collected. + # + # 2. The peer node creates a local garbage collection notifier + # and stores it in the local `%Pythonx.Object{}`. Once that + # local struct is garbage collected, the notifier sends a + # message and eventually the reference from point 1. is removed. + # + # Note that for this to work correctly, the object must be kept + # alive on the owner node, until the peer calls `track_remote_object/1`. + # This, for example, is guaranteed in FLAME when implementing the + # FLAME.Trackable protocol. + + use GenServer + + @name __MODULE__ + + def start_link(_opts) do + GenServer.start_link(__MODULE__, :ok, name: @name) + end + + @doc """ + An identity function to prevent GC. + """ + @spec identity(term()) :: term() + def identity(data), do: data + + @doc """ + Locates the ObjectTracker process. + """ + @spec whereis!() :: pid() + def whereis!() do + Process.whereis(@name) || exit({:noproc, {__MODULE__, :whereis!, []}}) + end + + @doc """ + Starts tracking a remote object, to prevent it from being garbage + collected. + + If the object is local, or already tracked locally, it is returned + as is. + + Returns an updated object and a marker PID. The marker is a dummy + process that stays alive as long as the object's owner node tracks + any references. The marker is specifically designed for implementing + `FLAME.Trackable`, and can be ignored otherwise. + """ + @spec track_remote_object(Pythonx.Object.t()) :: + {:ok, Pythonx.Object.t(), pid()} | {:noop, Pythonx.Object.t()} + def track_remote_object(%Pythonx.Object{resource: ref} = object) + when node(ref) == node() do + # Local object. + {:noop, object} + end + + def track_remote_object(%Pythonx.Object{remote_info: gc_notifier} = object) + when node(gc_notifier) == node() do + # Already tracked by this node. + {:noop, object} + end + + def track_remote_object(%Pythonx.Object{resource: remote_ref} = object) do + local_pid = whereis!() + remote_pid = :erpc.call(node(remote_ref), __MODULE__, :whereis!, []) + gc_notifier = Pythonx.NIF.create_gc_notifier(local_pid, {:local_gc, remote_pid, remote_ref}) + marker_pid = GenServer.call(remote_pid, {:track, remote_ref, local_pid}, :infinity) + object = %{object | remote_info: gc_notifier} + {:ok, object, marker_pid} + end + + @impl true + def init(:ok) do + {:ok, %{pid_refs: %{}, pid_monitors: %{}, marker_pid: nil}} + end + + @impl true + def handle_call({:track, ref, pid}, _from, state) do + pid_monitors = Map.put_new_lazy(state.pid_monitors, pid, fn -> Process.monitor(pid) end) + pid_refs = add_ref(state.pid_refs, pid, ref) + state = ensure_marker(state) + {:reply, state.marker_pid, %{state | pid_refs: pid_refs, pid_monitors: pid_monitors}} + end + + @impl true + def handle_continue(:gc, state) do + :erlang.garbage_collect() + {:noreply, state} + end + + @impl true + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + state = %{ + state + | pid_refs: Map.delete(state.pid_refs, pid), + pid_monitors: Map.delete(state.pid_monitors, pid) + } + + state = maybe_stop_marker(state) + {:noreply, state, {:continue, :gc}} + end + + def handle_info({:untrack, ref, pid}, state) do + pid_refs = remove_ref(state.pid_refs, pid, ref) + + state = + if Map.fetch!(pid_refs, pid) == %{} do + Process.demonitor(state.pid_monitors[pid], [:flush]) + + %{ + state + | pid_monitors: Map.delete(state.pid_monitors, pid), + pid_refs: Map.delete(pid_refs, pid) + } + else + %{state | pid_refs: pid_refs} + end + + state = maybe_stop_marker(state) + {:noreply, state, {:continue, :gc}} + end + + def handle_info({:local_gc, remote_pid, remote_ref}, state) do + send(remote_pid, {:untrack, remote_ref, self()}) + {:noreply, state} + end + + defp ensure_marker(%{marker_pid: nil} = state) do + pid = + spawn_link(fn -> + receive do + :stop -> :ok + end + end) + + %{state | marker_pid: pid} + end + + defp ensure_marker(state), do: state + + defp maybe_stop_marker(state) when state.marker_pid != nil and state.pid_refs == %{} do + send(state.marker_pid, :stop) + %{state | marker_pid: nil} + end + + defp maybe_stop_marker(state), do: state + + defp add_ref(pid_refs, pid, ref) do + case pid_refs do + %{^pid => %{^ref => count} = refs} -> + %{pid_refs | pid => %{refs | ref => count + 1}} + + %{^pid => refs} -> + %{pid_refs | pid => Map.put(refs, ref, 1)} + + %{} -> + Map.put(pid_refs, pid, %{ref => 1}) + end + end + + defp remove_ref(pid_refs, pid, ref) do + case pid_refs do + %{^pid => %{^ref => 1} = refs} -> + %{pid_refs | pid => Map.delete(refs, ref)} + + %{^pid => %{^ref => count} = refs} -> + %{pid_refs | pid => %{refs | ref => count - 1}} + + %{} -> + pid_refs + end + end +end + +if Code.ensure_loaded?(FLAME.Trackable) do + defimpl FLAME.Trackable, for: [Pythonx.Object] do + def track(object, acc, _node) do + case Pythonx.ObjectTracker.track_remote_object(object) do + {:noop, object} -> {object, acc} + {:ok, object, marker_pid} -> {object, [marker_pid | acc]} + end + end + end +end diff --git a/mix.exs b/mix.exs index 3d66390..1558ca0 100644 --- a/mix.exs +++ b/mix.exs @@ -39,6 +39,7 @@ defmodule Pythonx.MixProject do defp deps do [ + {:flame, "~> 0.5", optional: true}, {:fine, "~> 0.1.2", runtime: false}, {:elixir_make, "~> 0.9", runtime: false}, {:cc_precompiler, "~> 0.1", runtime: false}, diff --git a/mix.lock b/mix.lock index fb04431..5f18da6 100644 --- a/mix.lock +++ b/mix.lock @@ -4,6 +4,7 @@ "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, "ex_doc": {:hex, :ex_doc, "0.36.1", "4197d034f93e0b89ec79fac56e226107824adcce8d2dd0a26f5ed3a95efc36b1", [:mix], [{:earmark_parser, "~> 1.4.42", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d7d26a7cf965dacadcd48f9fa7b5953d7d0cfa3b44fa7a65514427da44eafd89"}, "fine": {:hex, :fine, "0.1.2", "85cf7dd190c7c6c54c2840754ae977c9acc0417316255b674fad9f2678e4ecc7", [:mix], [], "hexpm", "9113531982c2b60dbea6c7233917ddf16806947cd7104b5d03011bf436ca3072"}, + "flame": {:hex, :flame, "0.5.3", "af9a16c902dac100b4f6b91e64c10ef95f9785ef7b638fcfcebabbec682990e6", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, ">= 0.0.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "8b0b42026c1df2eeffdd8860b77af8437ab7aa72b5e648f8e92d8198f3b7fa1e"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, diff --git a/test/pythonx_test.exs b/test/pythonx_test.exs index 84fdee0..796f50c 100644 --- a/test/pythonx_test.exs +++ b/test/pythonx_test.exs @@ -477,6 +477,236 @@ defmodule PythonxTest do end end + describe "remote evaluation" do + @describetag :distributed + + @peer1 :"peer1@127.0.0.1" + @peer2 :"peer2@127.0.0.1" + + test "remote_eval/4 returns remote objects" do + {result, globals} = + Pythonx.remote_eval( + @peer1, + """ + x = 1 + x + """, + %{} + ) + + assert inspect(result) == """ + #Pythonx.Object< + [node: peer1@127.0.0.1] + 1 + >\ + """ + + assert inspect(globals["x"]) == """ + #Pythonx.Object< + [node: peer1@127.0.0.1] + 1 + >\ + """ + + # Hidden node. + + {result, globals} = + Pythonx.remote_eval( + @peer2, + """ + x = 1 + x + """, + %{} + ) + + assert inspect(result) == """ + #Pythonx.Object< + [node: peer2@127.0.0.1] + 1 + >\ + """ + + assert inspect(globals["x"]) == """ + #Pythonx.Object< + [node: peer2@127.0.0.1] + 1 + >\ + """ + end + + test "garbage collects only once the caller has no reference to the object" do + {py_test_object, %{}} = + Pythonx.remote_eval( + @peer1, + """ + import os + + os.environ["TEST_OBJECT_DELETED"] = "false" + + class TestObject: + def call_me(self): + return "maybe" + + def __del__(self): + os.environ["TEST_OBJECT_DELETED"] = "true" + + TestObject() + """, + %{} + ) + + :erpc.call(@peer1, :erlang, :garbage_collect, []) + + # Can pass the object to another evaluation. + {result, %{}} = + Pythonx.remote_eval( + @peer1, + """ + test_object.call_me() + """, + %{"test_object" => py_test_object} + ) + + assert inspect(result) =~ "maybe" + + Pythonx.remote_eval(@peer1, "import gc; gc.collect()", %{}) + + {result, %{}} = + Pythonx.remote_eval(@peer1, "import os; os.environ['TEST_OBJECT_DELETED']", %{}) + + assert inspect(result) =~ "false" + + # Hold a reference up until this point. + List.flatten([py_test_object]) + :erlang.garbage_collect(self()) + + Pythonx.remote_eval(@peer1, "import gc; gc.collect()", %{}) + + # Now it should be garbage collected. + {result, %{}} = + Pythonx.remote_eval(@peer1, "import os; os.environ['TEST_OBJECT_DELETED']", %{}) + + assert inspect(result) =~ "true" + end + + test "remote_eval/4 sends standard output to caller's group leader" do + assert ExUnit.CaptureIO.capture_io(fn -> + Pythonx.remote_eval( + @peer1, + """ + print("hello from Python") + """, + %{} + ) + end) == "hello from Python\n" + + # Python thread spawned by the evaluation + assert ExUnit.CaptureIO.capture_io(fn -> + Pythonx.remote_eval( + @peer1, + """ + import threading + + def run(): + print("hello from thread") + + thread = threading.Thread(target=run) + thread.start() + thread.join() + """, + %{} + ) + end) == "hello from thread\n" + end + + test "remote_eval/4 automatically copies objects in globals into the remote node" do + {one, %{}} = Pythonx.eval("1", %{}) + + {result, globals} = Pythonx.remote_eval(@peer1, "one + 2", %{"one" => one}) + + assert inspect(result) == """ + #Pythonx.Object< + [node: peer1@127.0.0.1] + 3 + >\ + """ + + assert inspect(globals["one"]) == """ + #Pythonx.Object< + [node: peer1@127.0.0.1] + 1 + >\ + """ + end + + test "copy_remote_object/1 makes a local copy of a remote object" do + {result, %{}} = Pythonx.remote_eval(@peer1, "1", %{}) + + assert inspect(result) == """ + #Pythonx.Object< + [node: peer1@127.0.0.1] + 1 + >\ + """ + + local = Pythonx.copy_remote_object(result) + + assert inspect(local) == """ + #Pythonx.Object< + 1 + >\ + """ + end + + test "copy_remote_object/1 uses cloudpickle if available" do + # The built-in pickle module does not support lambdas, but cloudpickle does. + {square_it, %{}} = Pythonx.remote_eval(@peer1, "lambda x: x * x", %{}) + + {result, %{}} = + Pythonx.eval("square_it(2)", %{"square_it" => Pythonx.copy_remote_object(square_it)}) + + assert repr(result) == "4" + end + + test "copy_remote_object/1 keeps already local object as is" do + {result, %{}} = Pythonx.eval("1", %{}) + + assert Pythonx.copy_remote_object(result) == result + end + + test "encode!/1 fails for remote objects" do + {result, %{}} = Pythonx.remote_eval(@peer1, "1", %{}) + + assert_raise Protocol.UndefinedError, ~r/remote objects cannot be encoded implicitly/, fn -> + Pythonx.encode!(result) + end + end + + test "FLAME.Trackable.track/3 pid terminates once there are no more object references" do + {untracked_object, _binding} = + :erpc.call(@peer1, Code, :eval_quoted, [ + quote do + {result, %{}} = Pythonx.eval("1", %{}) + # Keep the object around, since we track it only later. + Agent.start(fn -> result end) + result + end + ]) + + {tracked_object, [pid]} = FLAME.Trackable.track(untracked_object, [], @peer1) + + ref = Process.monitor(pid) + assert :erpc.call(@peer1, Process, :alive?, [pid]) + + # Hold a reference up until this point. + List.flatten([tracked_object]) + :erlang.garbage_collect(self()) + + assert_receive {:DOWN, ^ref, _, _, _} + end + end + defp repr(object) do assert %Pythonx.Object{} = object diff --git a/test/test_helper.exs b/test/test_helper.exs index 331a8e4..0545b01 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -6,8 +6,44 @@ name = "project" version = "0.0.0" requires-python = "==3.#{python_minor}.*" dependencies = [ - "numpy==2.1.2" + "numpy==2.1.2", + "cloudpickle==3.1.2" ] """) -ExUnit.start() +try_starting_epmd? = fn -> + case :os.type() do + {:unix, _} -> + {"", 0} == System.cmd("epmd", ["-daemon"]) + + _ -> + true + end +end + +exclude = + cond do + :distributed in Keyword.get(ExUnit.configuration(), :exclude, []) -> + [] + + try_starting_epmd?.() and match?({:ok, _}, Node.start(:"primary@127.0.0.1", :longnames)) -> + env = + for {key, value} <- Pythonx.install_env() do + {String.to_charlist(key), String.to_charlist(value)} + end + + {:ok, _pid, peer1} = :peer.start(%{name: :"peer1@127.0.0.1", env: env}) + {:ok, _pid, peer2} = :peer.start(%{name: :"peer2@127.0.0.1", env: env, args: ~w(-hidden)c}) + + for node <- [peer1, peer2] do + true = :erpc.call(node, :code, :set_path, [:code.get_path()]) + {:ok, _} = :erpc.call(node, :application, :ensure_all_started, [:pythonx]) + end + + [] + + true -> + [:distributed] + end + +ExUnit.start(exclude: exclude) From e0baf812643059c85bd66f7fa58a084e1fba1ef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Mon, 2 Mar 2026 22:39:21 +0100 Subject: [PATCH 2/3] Update c_src/pythonx.cpp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: José Valim --- c_src/pythonx.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/pythonx.cpp b/c_src/pythonx.cpp index 07d41af..f8f1149 100644 --- a/c_src/pythonx.cpp +++ b/c_src/pythonx.cpp @@ -1619,7 +1619,7 @@ extern "C" void pythonx_handle_io_write(const char *message, ErlNifPid janitor_pid; if (enif_whereis_pid(caller_env, janitor_name, &janitor_pid)) { auto device = type == 0 ? eval_info.stdout_device : eval_info.stderr_device; - // Copy the device term is from a differnet env, so we copy it into + // The device term is from a different env, so we copy it into // the message env, otherwise we may run into unexpected behaviour. device = enif_make_copy(env, device); From 51b1de0faf4ccc3ad5d2f59bc64d0cb30377f874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Mon, 2 Mar 2026 22:47:50 +0100 Subject: [PATCH 3/3] gc self --- lib/pythonx/object_tracker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pythonx/object_tracker.ex b/lib/pythonx/object_tracker.ex index f423417..75a350b 100644 --- a/lib/pythonx/object_tracker.ex +++ b/lib/pythonx/object_tracker.ex @@ -104,7 +104,7 @@ defmodule Pythonx.ObjectTracker do @impl true def handle_continue(:gc, state) do - :erlang.garbage_collect() + :erlang.garbage_collect(self()) {:noreply, state} end