7.8
Distributed Places
See also Distributed Places in The Racket Guide.
Distributed places support programs whose computation may span
physical machines. The design relies on machine nodes that
perform computation. The programmer configures a new distributed
system using a declarative syntax and callbacks. A node begins life
with one initial place: the message router. After a node has been configured, its
message router is activated by calling the message-router
function. The message router listens on a TCP port for incoming
connections from other nodes in the distributed system. Places can be
spawned within the node by sending place-spawn request messages to the
node’s message router.
The distributed places implementation relies on two assumptions:
The user’s ".ssh/config" and
".ssh/authorized_keys" files are configured correctly to
allow passwordless connection to remote hosts via public key authentication.
Distributed places does not support the specification of ssh usernames.
If a non-default ssh username is required the ".ssh/config" file
should be used to specifiy the username.
All machines run the same version of Racket. Futures versions of distributed
places may use the zo binary data format for serialization.
The following example illustrates a configuration and use of
distributed places that starts a new node on the current machine and
passes it a "Hello World" string:
Example:
> (module hello-world-example racket/base | (require racket/place/distributed | racket/place) | | (provide hello-world) | | (define (hello-world) | (place ch | (printf "hello-world received: ~a\n" (place-channel-get ch)) | (place-channel-put ch "Hello World\n") | (printf "hello-world sent: Hello World\n"))) | | | (module+ main | ; 1) spawns a node running at "localhost" and listenting on port | ; 6344 for incomming connections. | ; 2) connects to the node running at localhost:6344 and creates a | ; place on that node by calling the hello-world procedure from | ; the current module. | ; 3) returns a remote-node% instance (node) and a | ; remote-connection% instance (pl) for communicating with the | ; new node and place | (define-values (node pl) | (spawn-node-supervise-place-at "localhost" | #:listen-port 6344 | #:thunk #t | (quote-module-path "..") | 'hello-world)) | | ; starts a message router which adds three event-container<%>s to | ; its list of events to handle: the node and two after-seconds | ; event containers . Two seconds after the launch of the message-router, a | ; message will be sent to the pl place. After six seconds, the | ; program and all spawned nodes and places will terminate. | (message-router | node | (after-seconds 2 | (*channel-put pl "Hello") | (printf "message-router received: ~a\n" (*channel-get pl))) | | (after-seconds 6 | (exit 0))))) |
|
Waits in an endless loop for one of many events to become ready. The
message-router procedure constructs a
node%
instance to serve as the message router for the node. The
message-router procedure then adds all the declared
event-container<%>s to the
node% and finally calls
the never ending loop
sync-events method, which handles
events for the node.
Spawns a new remote node at hostname with one instance place specified by
the instance-module-path and instance-place-function-name.
When thunk is #f, the place is created as the result of the framework
calling (dynamic-place instance-module-path instance-place-function-name).
in the new node.
When thunk is #t the
instance-place-function-name function should use
dynamic-place or place to create and return an
initial place in the new node.
When the place-name symbol is present a named place is
created. The place-name symbol is used to establish later
connections to the named place.
The result is a remote-node% instance, not a
remote-connection%. Use get-first-place on the result to obtain a remote-connection%.
The restart-on-exit argument can be #t to instruct
the remote-connection% instance to respawn the place on the
remote node should it exit or terminate at any time. It can also be a
procedure of zero arguments to implement the restart procedure, or it
can be an object that support a restart method that takes a
place
argument.}
(spawn-node-supervise-place-at | | | | hostname | | | | instance-module-path | | | | instance-place-function-name | | | [ | #:listen-port port | | | | #:initial-message initial-message | | | | #:racket-path racket-path | | | | #:ssh-bin-path ssh-path | | | | #:distributed-launch-path launcher-path | | | | #:restart-on-exit restart-on-exit | | | | #:named named | | | | #:thunk thunk]) | |
|
|
hostname : string? |
instance-module-path : module-path? |
instance-place-function-name : symbol? |
port : port-no? = DEFAULT-ROUTER-PORT |
initial-message : any = #f |
racket-path : string-path? = (racket-path) |
ssh-path : string-path? = (ssh-bin-path) |
| launcher-path | | : | | string-path? | | | | = | | (path->string distributed-launch-path) |
|
restart-on-exit : any/c = #f |
named : (or/c #f string?) = #f |
thunk : (or/c #f #t) = #f |
Spawns a new remote node at
hostname and returns a
remote-node% handle.
When
thunk is
#f, creates a new place on
remote-node by using
dynamic-place to invoke
instance-place-function-name from the module
instance-module-path.
When thunk is #t, creates a new place at remote-node by executing the thunk
exported as instance-place-function-name from the module
instance-module-path. The function should use
dynamic-place or place to create and return a place in the new
node.
When the place-name symbol is present a named place is
created. The place-name symbol is used to establish later
connections to the named place.
Spawns an attached external process at host hostname.
Creates a new threadon the
remote-node by using
dynamic-require to invoke
instance-place-function-name from the module
instance-module-path.
Returns a
restarter% instance that should be supplied to a
#:restart-on-exit argument.
Returns a
after-seconds% instance that should be supplied to
a
message-router. The
after-seconds% instance
executes the
bodys after a delay of
N seconds from the
start of the event loop, where
N is the result of
seconds-expr.
Logs a message at the root node.
The
cmdline-list is a list of command line arguments of type
string and/or
path.
The
parent argument is a
remote-node% instance that will be notified when the process dies via
a
(send parent process-died this) call.
The
pch argument is a
place-channel. Messages
received on
pch are forwarded to the socket-connection%
sch via a
dcgm message. e.g.
(sconn-write-flush sch (dcgm DCGM-TYPE-INTER-DCHANNEL id id msg))
The
id is a
exact-positive-integer that identifies
the socket-connection subchannel for this inter-node place connection.
When a
host and
port are supplied a new tcp
connection is established. If a
input-port? and
output-port? are supplied as
in and
out,
the ports are used as a connection to the remote host. The
retry-times argument specifies how many times to retry the
connection attempt should it fail to connect and defaults to 30 retry
attempts. Often a remote node is still booting up when a connection
is attempted and the connection needs to be retried several times.
The
delay argument specifies how many seconds to wait between
retry attempts. The
background-connect? argument defaults to
#t and specifies that the constructor should retry
immediately and that connecion establishment should occur in the
background. Finally, the
remote-node argument specifies the
remote-node% instance that should be notified should the
connection fail.
The
node% instance controls a distributed places node. It
launches places and routes inter-node place messages in the
distributed system. The
message-router form constructs a
node% instance under the hood. Newly spawned nodes also have
a
node% instance in their initial place that serves as the
node’s message router.
Constructs a
node% that will listen on
listen-port for inter-node connections.
Starts the never ending event loop for this distributed places node.
Constructs a
node% that will listen on
listen-port for inter-node connections.
When set to true the restart-on-exit parameter causes the
specified node to be restarted when the ssh session spawning the node
dies.
Returns the communication channel for the first place spawned on this node.
Returns
(format "PLACE ~a:~a" host-name listen-port)Launches a place on the remote node represented by this
remote-node% instance.
The place-exec argument describes how the remote place should be launched,
and it should have one of the following shapes:
The difference between these two launching methods is that
the
'place version of
place-exec expects a
thunk to be exported by the module
place-module-path. Executing the thunk is expected to
create a new place and return a place descriptor to the newly
created place. The
'dynamic-place version of
place-exec expects place-func to be a function taking a
single argument, the initial channel argument, and calls
dynamic-place on behalf of the user and creates the new
place from the
place-module-path and
place-func.
The restart-on-exit argument is treated in the same way
as for spawn-node-with-dynamic-place-at.
The one-sided-place? argument is an internal use
argument for launching remote places from within a place using
the old design pattern.
Connects to a named place on the remote node represented by this
remote-node% instance.
Sends a message instructing the remote node represented by this
remote-node% instance to exit immediately
Sends node a message telling it to exit immediately.
Waits for
place to terminate.
The
remote-connection%
instance provides a remote api to a place
running on a remote distributed places node. It launches a
places or connects to a named place and routes inter-node place messages to the remote place.
The place-exec argument describes how the remote place should be launched
in the same way as for launch-place in remote-node%.
The restart-on-exit argument is treated in the same way
as for spawn-node-with-dynamic-place-at.
The one-sided-place? argument is an internal use
argument for launching remote places from within a place using
the old design pattern.
See set-on-channel! for description of on-channel argument.
Installs a handler function that handles messages from the remote place.
The setup/distributed-docs module uses this callback to handle job completion messages.
The
place% instance represents a place launched on a
distributed places node at that node. It launches a compute places and
routes inter-node place messages to the place.
Constructs a
remote-connection% instance.
The
place-exec argument describes how the remote place should be launched
in the same way as for
launch-place in
remote-node%.
The
ch-id and
sc arguments are internally used to
establish routing between the remote node spawning this place and the
place itself. The
on-place-dead callback handles the event
when the newly spawned place terminates.
Blocks and waits for the subprocess representing the
remote-node% to exit.
The
connection% instance represents a connection to a
named-place instance running on the current node. It routes inter-node
place messages to the named place.
Constructs a
remote-connection% instance.
The
place-exec argument describes how the remote place should be launched
in the same way as for
launch-place in
remote-node%.
The
ch-id and
sc arguments are internally used to
establish routing between the remote node and this named-place.
The
respawn-and-fire% instance represents a thunk that should
execute every
n seconds.
The
after-seconds% instance represents a thunk that should
execute after
n seconds.
Constructs an
after-seconds% instance that when placed
inside a
message-router construct causes the supplied
thunk to execute after
n seconds.
The
restarter% instance represents a restart strategy.
Constructs an
restarter% instance that when supplied to a
#:restart-on-exit argument, attempts to restart the process
every
seconds. The
retry argument specifies how
many time to attempt to restart the process before giving up. If the
process stays alive for
(* 2 seconds) the attempted retries
count is reset to
0. The
on-final-fail thunk is
called when the number of retries is exceeded
distributed-launch-path : path?
|
Contains the local path to the distributed places launcher. The
distributed places launcher is the bootsrap file that launches the
message router on a new node.
Returns the path to the ssh binary on the local system in string form.
Returns the path to the currently executing Racket binary on the local system.
Returns the path to the distributed places launch file.
The function can take an optional argument specifying the path to the collects directory.
Spawns a node in the background using a Racket thread and returns a channel that becomes ready with a
remote-node%
once the node has spawned successfully
Sends msg over ch channel.
Returns a message received on ch channel.
Creates and returns a new place channel connection to a named place at
dest-list.
The
dest-list argument is a list of a remote-hostname remote-port and named-place name.
The channel
ch should be a connection to a
message-router.
Sends a message to a message router over mrch channel asking the message router to spawn a new node
at host listening on port listen-port. If the #:solo keyword argument is supplied
the new node is not folded into the complete network with other nodes in the distributed system.
Sends a message to a message router over mrch channel asking the message router to spawn
a named place at dest named name. The place is spawned at the remote node by calling
dynamic place with module-path path and function func. The dest parameter should be a
list of remote-hostname and remote-port.
Sends a message to a message router over mrch channel asking the message router to create a new
connection to the named place named name at dest.
The dest parameter should be a list of remote-hostname and remote-port.
Starts a message router in a Racket thread connected to
nodes, listening on port
listen-port, and
returns a
channel? connection to the message router.
The default port for distributed places message router.
Returns the first message received on
ch that has the type
type. Messages are lists and their type is the first
item of the list which should be a
symbol?. Messages of other types that are received are queued for later
get requests.
Writes datum to port and then flushes port.
Example:
1 Define Remote Server
The define-named-remote-server form takes an identifier and a
list of custom expressions as its arguments. From the identifier a
function is created by prepending the make- prefix. This
procedure takes a single argument a place-channel. In the
example below, the make-tuple-server identifier is the
place-function-name given to the
supervise-named-dynamic-place-at form to spawn an rpc server.
The server created by the make-tuple-server procedure sits in
a loop waiting for rpc requests from the define-rpc functions
documented below.
Expands to a
define, which is closed over by the
define-rpc functions
to form local state.
(define-rpc (id args ...) body ...)
|
Expands to a client rpc function
name-id which sends
id and
args ... to
the rpc server
rpc-place and waits for a response.
(define (name-id rpc-place args ...) body)(define-cast (id args ...) body ...)
|
Expands to a client rpc function
name-id which sends
id and
args ... to
the rpc server
rpc-place but does not receive any response. A cast is a one-way communication
technique.
(define (name-id rpc-place args ...) body)The
define-state custom form translates into a simple
define form, which is closed over by the define-rpc
forms.
The define-rpc form is expanded into two parts. The first
part is the client stubs that call the rpc functions. The client
function name is formed by concatenating the
define-named-remote-server identifier, tuple-server,
with the RPC function name set to form tuple-server-set.
The RPC client functions take a destination argument which is a
remote-connection% descriptor and then the RPC function
arguments. The RPC client function sends the RPC function name,
set, and the RPC arguments to the destination by calling an
internal function named-place-channel-put. The RPC client
then calls named-place-channel-get to wait for the RPC
response.
The second expansion part of define-rpc is the server
implementation of the RPC call. The server is implemented by a match
expression inside the make-tuple-server function. The match
clause for tuple-server-set matches on messages beginning
with the 'set symbol. The server executes the RPC call with
the communicated arguments and sends the result back to the RPC
client.
The define-cast form is similar to the define-rpc form
except there is no reply message from the server to client
Example:
Example:
2 Async Bidirectional Channels
Creates and returns an opaque structure, which is the async bidirectional channel.
A predicate that returns #t for async bidirectional channels.
Returns the next available message from the async bidirectional channel ch.
Sends message msg to the remote end of the async bidirectional channel ch.
3 Distributed Places MPI
The communicator struct
rmpi-comm contains the rmpi process
rank
id, the quantity of processes in the communicator group,
cnt, and a vector of place-channels, one for each process in
the group.
Takes a rmpi communicator structure, comm, and returns the node id of the RMPI
process.
Takes a rmpi communicator structure, comm, and returns the count of the RMPI
processes in the communicator group.
Sends val to destination rmpi process number dest
using the RMPI communicator structure comm.
Receives a message from source rmpi process number src
using the RMPI communicator structure comm.
Creates the
rmpi-comm structure instance using the named
place’s original place-channel
ch. In addition to the
communicator structure,
rmpi-init returns a list of initial
arguments and the original place-channel
ch wrapped in a
named-place-typed-channel%. The
named-place-typed-channel% wrapper allows for the reception
of list messages typed by an initial symbol.
Broadcasts
val from
src to all rmpi processes in
the communication group using a hypercube algorithm. Receiving
processes call
(rmpi-broadcast comm src).
Reduces val using the op operator to dest
rmpi node using a hypercube algorithm.
Introduces a synchronization barrier for all rmpi processes in the
communcication group comm.
Reduces val using the op operator to rmpi node
0 and then broadcasts the reduced value to all nodes in the
communication group.
Partitions
num into
rmpi-cnt equal pieces and
returns the offset and length for the
rmpi-idth piece.
Builds a hash from keywords to keyword arguments for use with the
rmpi-launch function.
Launches distributed places nodes running #:mpi-func in
#:mpi-module with #:mpi-args. The config is a
list of node configs, where each node config consists of a hostname,
port, named place symbol and rmpi id number, followed by and
optional hash of keyword #:racket-path,
#:distributed-launch-path, #:mpi-module,
#:mpi-func, and #:mpi-args to keyword arguments.
Missing optional keyword arguments will be taken from the
default-node-config hash of keyword arguments.
Rendezvous with the
rmpi-launch, using the
tc
returned by
rmpi-launch, to indicate that the RMPI module
is done executing and that
rmpi-launch can return control
to its caller.
Example:
> (rmpi-launch | (rmpi-build-default-config | #:racket-path "/tmp/mplt/bin/racket" | #:distributed-launch-path (build-distributed-launch-path | "/tmp/mplt/collects") | #:mpi-module "/tmp/mplt/kmeans.rkt" | #:mpi-func 'kmeans-place | #:mpi-args (list "/tmp/mplt/color100.bin" #t 100 | 9 10 1e-07)) | | (list (list "nodea.example.com" 6340 'kmeans_0 0) | (list "nodeb.example.com" 6340 'kmeans_1 1) | (list "nodec.example.com" 6340 'kmeans_2 2) | (list "noded.example.com" 6340 'kmeans_3 3))) |
|