The goal of interprocess
is to synchronize concurrent R
processes.
Allows R to access low-level operating system mechanisms for performing atomic operations on shared data structures. Mutexes provide shared and exclusive locks. Semaphores act as counters. Message queues move text strings from one process to another. All these interprocess communication (IPC) tools can optionally block with or without a timeout.
Works cross-platform, including Windows, MacOS, and Linux, and can be used to synchronize a mixture of R sessions and other types of processes if needed.
Implemented using the Boost C++ library.
# Install the latest stable version from CRAN:
install.packages("interprocess")
# Or the development version from GitHub:
install.packages("pak")
::pak("cmmr/interprocess") pak
Mutexes, semaphores, and message queues are managed by the operating system. Any process that knows the resource’s name can access it. Therefore, sharing the resource name (a short text string) amongst cooperating processes enables communication and synchronization.
These names can be any alphanumeric string that starts with a letter
and is no longer than 250 characters. The mutex()
,
semaphore()
, and queue()
functions will
default to generating a unique identifier, or you can provide a custom
or pre-existing one with the name
parameter. If the
resource is associated with a specific file or directory, the
file
parameter can be used to derive a name from
normalizing and hashing that path.
Setting cleanup = TRUE
will automatically remove the
resource when the R session exits. Otherwise, the resource will persist
until $remove()
is called or the operating system is
restarted.
An exclusive lock is acquired by default. For a shared lock, use
shared = TRUE
.
<- tempfile()
tmp <- interprocess::mutex(file = tmp)
mut
$name
mut#> [1] "S6qYUQK2SwA"
with(mut, writeLines('Important Data', tmp))
with(mut, shared = TRUE, readLines(tmp))
#> [1] "Important Data"
$remove()
mutunlink(tmp)
The semaphore’s value can be incremented with $post()
and decremented with $wait()
. The initial value can also be
set in the constructor.
<- interprocess::semaphore('mySemaphore', value = 1)
sem
$name
sem#> [1] "mySemaphore"
$post()
sem
$wait(timeout_ms = 0)
sem#> [1] TRUE
$wait(timeout_ms = 0)
sem#> [1] TRUE
$wait(timeout_ms = 0)
sem#> [1] FALSE
$remove() sem
The constructor’s max_count
and max_nchar
parameters determine how much memory is allocated for the queue.
<- interprocess::queue(max_count = 2, max_nchar = 5)
mq
$name
mq#> [1] "akRKb4iymMcAFV"
$send('Hello')
mq$send('Hi', priority = 1)
mq
$receive(timeout_ms = 0)
mq#> [1] "Hi"
$receive(timeout_ms = 0)
mq#> [1] "Hello"
$receive(timeout_ms = 0)
mq#> [1] NULL
$remove() mq