Commit 0f7d6eae authored by Éric Thiébaut's avatar Éric Thiébaut
Browse files

Use methods implemented for remote objects with remote mirrors

parent 86eee86f
......@@ -99,6 +99,8 @@ libtao_la_SOURCES = \
locks.c \
logmsg.c \
remote-mirrors.c \
remote-objects.c \
rwlocked-objects.c \
options.c \
pixels.c \
preprocessing.c \
......
......@@ -230,118 +230,20 @@ double *tao_remote_mirror_get_actual_commands(
return (obj == NULL) ? NULL : remote_mirror_get_act_cmds(obj);
}
// Inline function to check whether a client should wait for the remote server
// to be ready to accept commands.
//
// NOTE: Since communication is asynchronous, we can assume that a server may
// receive at most one command while initializing.
static inline bool wait_for_command(
tao_status status,
const tao_remote_mirror* obj)
{
return (status == TAO_OK && is_alive(obj)
&& obj->base.command != TAO_COMMAND_NONE);
}
// Inline function to check whether a client should wait for the requested
// frame to be available.
static inline bool wait_for_frame(
tao_status status,
const tao_remote_mirror* obj,
tao_serial serial)
{
return (status == TAO_OK && is_alive(obj)
&& obj->base.serial < serial);
}
// Attempt to lock the remote instance and to wait for its server to accept
// commands or to be killed. On return, `*locked` indicates whether the remote
// instance has been locked by the caller. It the returned status is `TAO_OK`,
// the lock has been acquired and the server shall be either idle or killed.
static tao_status lock_and_wait_for_command(
bool* locked,
tao_remote_mirror* obj,
double secs)
{
// Convert seconds to absolute time, then lock resources and wait until
// the server be ready for a new command.
tao_status status;
tao_time abstime;
switch (tao_get_absolute_timeout(&abstime, secs)) {
case TAO_TIMEOUT_NEVER:
status = tao_remote_mirror_lock(obj);
*locked = (status == TAO_OK);
while (wait_for_command(status, obj)) {
status = tao_remote_mirror_wait_condition(obj);
}
break;
case TAO_TIMEOUT_FUTURE:
status = tao_remote_mirror_abstimed_lock(obj, &abstime);
*locked = (status == TAO_OK);
while (wait_for_command(status, obj)) {
status = tao_remote_mirror_abstimed_wait_condition(obj, &abstime);
}
break;
case TAO_TIMEOUT_NOW:
status = tao_remote_mirror_try_lock(obj);
*locked = (status == TAO_OK);
if (wait_for_command(status, obj)) {
status = TAO_TIMEOUT;
}
break;
case TAO_TIMEOUT_PAST:
*locked = false;
status = TAO_TIMEOUT;
break;
default: // TAO_TIMEOUT_ERROR
*locked = false;
status = TAO_ERROR;
break;
}
return status;
}
static inline tao_status send_command(
const char* func,
tao_remote_mirror* obj,
tao_command command,
double secs)
{
if (obj == NULL) {
tao_store_error(func, TAO_BAD_ADDRESS);
return TAO_ERROR;
}
bool locked;
tao_status status = lock_and_wait_for_command(&locked, obj, secs);
if (status == TAO_OK) {
obj->base.command = command;
if (tao_remote_mirror_broadcast_condition(obj) != TAO_OK) {
obj->base.command = TAO_COMMAND_NONE;
status = TAO_ERROR;
}
}
if (locked && tao_remote_mirror_unlock(obj) != TAO_OK) {
status = TAO_ERROR;
}
return status;
}
tao_status tao_remote_mirror_kill(
tao_remote_mirror* obj,
double secs)
{
tao_status status = send_command(__func__, obj, TAO_COMMAND_KILL, secs);
if (status == TAO_TIMEOUT && !is_alive(obj)) {
status = TAO_OK;
}
return status;
return tao_remote_object_send_simple_command(
tao_remote_object_cast(obj), TAO_COMMAND_KILL, secs);
}
tao_status tao_remote_mirror_reset(
tao_remote_mirror* obj,
double secs)
{
return send_command(__func__, obj, TAO_COMMAND_RESET, secs);
return tao_remote_object_send_simple_command(
tao_remote_object_cast(obj), TAO_COMMAND_RESET, secs);
}
tao_status tao_remote_mirror_set_reference(
......@@ -364,8 +266,9 @@ tao_status tao_remote_mirror_set_reference(
double* dest = remote_mirror_get_refs(obj);
// Wait until server be ready for a new command.
bool locked;
tao_status status = lock_and_wait_for_command(&locked, obj, secs);
int locked;
tao_status status = tao_remote_object_lock_for_command(
&locked, tao_remote_object_cast(obj), secs);
// Copy reference commands.
if (status == TAO_OK) {
......@@ -401,8 +304,9 @@ tao_status tao_remote_mirror_send_commands(
double* dest = remote_mirror_get_req_cmds(obj);
// Wait until server ready for a new command.
bool locked;
tao_status status = lock_and_wait_for_command(&locked, obj, secs);
int locked;
tao_status status = tao_remote_object_lock_for_command(
&locked, tao_remote_object_cast(obj), secs);
// Get next serial number.
if (status == TAO_OK) {
......@@ -440,94 +344,8 @@ tao_serial tao_remote_mirror_wait_data(
tao_serial serial,
double secs)
{
// Check argument(s) and figure out which frame to wait for.
if (obj == NULL) {
tao_store_error(__func__, TAO_BAD_ADDRESS);
goto error;
}
tao_serial last = obj->base.serial; // NOTE: ok because atomic
if (last < 0) {
tao_store_error(__func__, TAO_CORRUPTED);
goto error;
}
if (serial <= 0) {
// Manage to wait for next frame.
serial = last + 1;
}
tao_status status = TAO_OK;
if (serial > last) {
// Convert seconds to absolute time, then wait for next data frame(s)
// until a sufficient serial number or the time limit is reached.
tao_time abstime;
bool locked = false;
switch (tao_get_absolute_timeout(&abstime, secs)) {
case TAO_TIMEOUT_NEVER:
status = tao_remote_mirror_lock(obj);
locked = (status == TAO_OK);
while (wait_for_frame(status, obj, serial)) {
status = tao_remote_mirror_wait_condition(obj);
}
break;
case TAO_TIMEOUT_FUTURE:
status = tao_remote_mirror_abstimed_lock(obj, &abstime);
locked = (status == TAO_OK);
while (wait_for_frame(status, obj, serial)) {
status = tao_remote_mirror_abstimed_wait_condition(
obj, &abstime);
}
break;
case TAO_TIMEOUT_NOW:
status = tao_remote_mirror_try_lock(obj);
locked = (status == TAO_OK);
if (wait_for_frame(status, obj, serial)) {
status = TAO_TIMEOUT;
}
break;
case TAO_TIMEOUT_PAST:
// Return a value indicating a timeout.
status = TAO_TIMEOUT;
break;
default:
// The only remaining possibility is that `tao_get_absolute_timeout`
// returned an error.
status = TAO_ERROR;
break;
}
// Unlock ressources.
if (locked && tao_remote_mirror_unlock(obj) != TAO_OK) {
status = TAO_ERROR;
}
}
// If no problems occurred, the result is the serial number of the
// requested frame; otherwise, it is a nonpositive number indicating the
// problem.
if (status == TAO_ERROR) {
// An error has occured.
error:
return -3;
}
last = obj->base.serial; // NOTE: ok because atomic
if (last < 0) {
tao_store_error(__func__, TAO_CORRUPTED);
goto error;
}
if (serial <= last) {
if (serial > last - obj->base.nbufs) {
// The requested frame is available in the cyclic list of buffers.
return serial;
} else {
// The requested frame is too old.
return -1;
}
}
if (! is_alive(obj)) {
// The requested frame will never be available because the server
// is no longer alive.
return -2;
}
// A timeout occurred.
return 0;
return tao_remote_object_wait_serial(
tao_remote_object_cast(obj), serial, secs);
}
// FIXME: check running?
......
// remote-objects.c -
//
// Management of basic shared objects used to communicate with remote server.
//
//-----------------------------------------------------------------------------
//
// This file if part of TAO real-time software licensed under the MIT license
// (https://git-cral.univ-lyon1.fr/tao/tao-rt).
//
// Copyright (C) 2018-2022, Éric Thiébaut.
#include <string.h>
#include "tao-basics.h"
#include "tao-errors.h"
#include "tao-macros.h"
#include "tao-generic.h"
#include "tao-shared-objects.h"
#include "tao-remote-objects-private.h"
const char* tao_state_get_name(
tao_state state)
{
switch (state) {
case TAO_STATE_INITIALIZING:
return "initializing";
case TAO_STATE_WAITING:
return "waiting";
case TAO_STATE_STARTING:
return "starting";
case TAO_STATE_WORKING:
return "working";
case TAO_STATE_STOPPING:
return "stopping";
case TAO_STATE_ABORTING:
return "aborting";
case TAO_STATE_KILLED:
return "killed";
default:
return "unknown";
}
}
tao_remote_object* tao_remote_object_create(
const char* owner,
tao_object_type type,
long nbufs,
long offset,
long stride,
size_t size,
unsigned flags)
{
long len = TAO_STRLEN(owner);
if (len < 1 || len >= TAO_OWNER_SIZE) {
tao_store_error(__func__, TAO_BAD_NAME);
return NULL;
}
if ((type & TAO_SHARED_SUPERTYPE_MASK) != TAO_REMOTE_OBJECT) {
tao_store_error(__func__, TAO_BAD_TYPE);
return NULL;
}
if (nbufs < 0 || offset < sizeof(tao_remote_object) || stride < 0 ||
size < offset + nbufs*stride) {
tao_store_error(__func__, TAO_BAD_SIZE);
return NULL;
}
tao_remote_object* obj = (tao_remote_object*)tao_shared_object_create(
type, size, flags);
if (obj == NULL) {
return NULL;
}
for (long i = 0; i < len; ++i) {
((char*)obj->owner)[i] = owner[i];
}
((char*)obj->owner)[len] = '\0';
tao_forced_store(&obj->nbufs, nbufs);
tao_forced_store(&obj->offset, offset);
tao_forced_store(&obj->stride, stride);
obj->serial = 0;
obj->state = TAO_STATE_INITIALIZING;
obj->command = TAO_COMMAND_NONE;
return obj;
}
tao_remote_object* tao_remote_object_attach(
tao_shmid shmid)
{
tao_shared_object* obj = tao_shared_object_attach(shmid);
if (obj != NULL &&
(obj->type & TAO_SHARED_SUPERTYPE_MASK) == TAO_REMOTE_OBJECT) {
return (tao_remote_object*)obj;
}
tao_shared_object_detach(obj);
tao_store_error(__func__, TAO_BAD_TYPE);
return NULL;
}
// Basic methods.
#define TYPE remote_object
#define IS_REMOTE_OBJECT 1
#include "./shared-methods.c"
// Check whether the server owning the remote object is alive.
//
// NOTE: Since server state is an *atomic* variable, the caller may not have
// locked the object.
static inline bool is_alive(
const tao_remote_object* obj)
{
return (obj->state != TAO_STATE_KILLED);
}
// Inline function to check whether a client should wait for the remote server
// to be ready to accept commands.
//
// NOTE: Since communication is asynchronous, we can assume that a server may
// receive at most one command while initializing.
static inline bool wait_for_command(
tao_status status,
const tao_remote_object* obj)
{
return (status == TAO_OK && is_alive(obj)
&& obj->command != TAO_COMMAND_NONE);
}
// Inline function to check whether a client should wait for the requested
// frame to be available.
static inline bool wait_for_serial(
tao_status status,
const tao_remote_object* obj,
tao_serial serial)
{
return (status == TAO_OK && is_alive(obj)
&& obj->serial < serial);
}
tao_serial tao_remote_object_wait_serial(
tao_remote_object* obj,
tao_serial serial,
double secs)
{
// Check argument(s) and figure out which buffer to wait for.
if (obj == NULL) {
tao_store_error(__func__, TAO_BAD_ADDRESS);
goto error;
}
tao_serial last = obj->serial; // NOTE: ok because atomic
if (last < 0) {
tao_store_error(__func__, TAO_CORRUPTED);
goto error;
}
if (serial <= 0) {
// Manage to wait for next buffer.
serial = last + 1;
}
tao_status status = TAO_OK;
if (serial > last) {
// Convert seconds to absolute time, then wait for next buffers until a
// sufficient serial number is reached or the time limit is exhausted.
tao_time abstime;
bool locked = false;
switch (tao_get_absolute_timeout(&abstime, secs)) {
case TAO_TIMEOUT_NEVER:
status = tao_remote_object_lock(obj);
locked = (status == TAO_OK);
while (wait_for_serial(status, obj, serial)) {
status = tao_remote_object_wait_condition(obj);
}
break;
case TAO_TIMEOUT_FUTURE:
status = tao_remote_object_abstimed_lock(obj, &abstime);
locked = (status == TAO_OK);
while (wait_for_serial(status, obj, serial)) {
status = tao_remote_object_abstimed_wait_condition(
obj, &abstime);
}
break;
case TAO_TIMEOUT_NOW:
status = tao_remote_object_try_lock(obj);
locked = (status == TAO_OK);
if (wait_for_serial(status, obj, serial)) {
status = TAO_TIMEOUT;
}
break;
case TAO_TIMEOUT_PAST:
// Return a value indicating a timeout.
status = TAO_TIMEOUT;
break;
default:
// The only remaining possibility is that
// `tao_get_absolute_timeout` returned an error
// (TAO_TIMEOUT_ERROR).
status = TAO_ERROR;
break;
}
// Unlock ressources.
if (locked && tao_remote_object_unlock(obj) != TAO_OK) {
status = TAO_ERROR;
}
}
// If no problems occurred, the result is the serial number of the
// requested buffer; otherwise, it is a nonpositive number indicating the
// problem. The serial number of the last output buffer is loaded again to
// maximize the odds of success of the operation.
if (status != TAO_ERROR) {
last = obj->serial; // NOTE: ok because atomic
if (last < 0) {
tao_store_error(__func__, TAO_CORRUPTED);
goto error;
}
if (serial <= last) {
if (serial > last - obj->nbufs) {
// The requested buffer is available in the cyclic list of
// buffers.
return serial;
} else {
// The requested buffer is too old.
return -1;
}
} else if (is_alive(obj)) {
// A timeout occurred.
return 0;
} else {
// The requested buffer will never be available because the server
// is no longer alive.
return -2;
}
}
// An error has occured.
error:
return -3;
}
tao_status tao_remote_object_lock_for_command(
int* locked,
tao_remote_object* obj,
double secs)
{
// Minimal check.
if (obj == NULL || locked == NULL) {
tao_store_error(__func__, TAO_BAD_ADDRESS);
if (locked != NULL) {
*locked = false;
}
return TAO_ERROR;
}
// Convert seconds to absolute time, then lock resources and wait until
// the server be ready for a new command.
tao_status status;
tao_time abstime;
switch (tao_get_absolute_timeout(&abstime, secs)) {
case TAO_TIMEOUT_NEVER:
status = tao_remote_object_lock(obj);
*locked = (status == TAO_OK);
while (wait_for_command(status, obj)) {
status = tao_remote_object_wait_condition(obj);
}
break;
case TAO_TIMEOUT_FUTURE:
status = tao_remote_object_abstimed_lock(obj, &abstime);
*locked = (status == TAO_OK);
while (wait_for_command(status, obj)) {
status = tao_remote_object_abstimed_wait_condition(obj, &abstime);
}
break;
case TAO_TIMEOUT_NOW:
status = tao_remote_object_try_lock(obj);
*locked = (status == TAO_OK);
if (wait_for_command(status, obj)) {
status = TAO_TIMEOUT;
}
break;
case TAO_TIMEOUT_PAST:
*locked = false;
status = TAO_TIMEOUT;
break;
default: // TAO_TIMEOUT_ERROR
*locked = false;
status = TAO_ERROR;
break;
}
return status;
}
tao_status tao_remote_object_send_simple_command(
tao_remote_object* obj,
tao_command command,
double secs)
{
if (obj == NULL) {
tao_store_error(__func__, TAO_BAD_ADDRESS);
return TAO_ERROR;
}
if (command == TAO_COMMAND_KILL) {
if (! is_alive(obj)) {
// Assume success for a "kill" command when the server is no longer
// alive.
return TAO_OK;
}
}
int locked;
tao_status status = tao_remote_object_lock_for_command(&locked, obj, secs);
if (status == TAO_OK) {
obj->command = command;
if (tao_remote_object_broadcast_condition(obj) != TAO_OK) {
obj->command = TAO_COMMAND_NONE;
status = TAO_ERROR;
}
}
if (locked && tao_remote_object_unlock(obj) != TAO_OK) {
status = TAO_ERROR;
}
if (command == TAO_COMMAND_KILL) {
// Assume success upon timout if the server is no longer alive.
if (status == TAO_TIMEOUT && !is_alive(obj)) {
status = TAO_OK;
}
}
return status;
}
tao_status tao_remote_object_kill(
tao_remote_object* obj,
double secs)
{
// NOTE: Checking of arguments is done by called functions.
return tao_remote_object_send_simple_command(obj, TAO_COMMAND_KILL, secs);
}
// rwlocked-objects.c -
//
// Management of basic shared objects with read/write restricted access.
//
//-----------------------------------------------------------------------------
//
// This file if part of TAO real-time software licensed under the MIT license
// (https://git-cral.univ-lyon1.fr/tao/tao-rt).
//
// Copyright (C) 2018-2022, Éric Thiébaut.
#include "tao-basics.h"
#include "tao-errors.h"
#include "tao-macros.h"
#include "tao-generic.h"
#include "tao-shared-objects.h"
#include "tao-rwlocked-objects-private.h"
tao_rwlocked_object* tao_rwlocked_object_create(
uint32_t type,
size_t size,
unsigned flags)
{
if ((type & TAO_SHARED_SUPERTYPE_MASK) != TAO_RWLOCKED_OBJECT) {
tao_store_error(__func__, TAO_BAD_TYPE);
return NULL;
}
if (size < sizeof(tao_rwlocked_object)) {
tao_store_error(__func__, TAO_BAD_SIZE);
return NULL;
}
tao_rwlocked_object* obj = (tao_rwlocked_object*)tao_shared_object_create(
type, size, flags);
if (obj == NULL) {
return NULL;
}
return obj;
}
tao_rwlocked_object* tao_rwlocked_object_attach(