Commit 1993cf49 authored by Éric Thiébaut's avatar Éric Thiébaut
Browse files

Provide driver to run mirror server loop

parent a8026c92
......@@ -10,6 +10,7 @@
// Copyright (C) 2019-2021, Éric Thiébaut.
#include "tao-mirrors-private.h"
#include "tao-config.h"
#include "tao-generic.h"
#include <assert.h>
......@@ -67,27 +68,31 @@ static inline size_t aligned(size_t size)
return TAO_ROUND_UP(size, TAO_ALIGNMENT);
}
#define address(ptr, off) ((void*)(((uint8_t*)(ptr)) + (off)))
//-----------------------------------------------------------------------------
// REMOTE DEFORMABLE MIRROR
static inline long* remote_mirror_inds(
const tao_remote_mirror* dm)
{
return address(dm, dm->inds_offset);
return TAO_REMOTE_MIRROR_INDS(dm);
}
static inline double* remote_mirror_refs(
const tao_remote_mirror* dm)
{
return address(dm, dm->refs_offset);
return TAO_REMOTE_MIRROR_REFS(dm);
}
static inline double* remote_mirror_req_cmds(
const tao_remote_mirror* dm)
{
return TAO_REMOTE_MIRROR_REQ_CMDS(dm);
}
static inline double* remote_mirror_cmds(
static inline double* remote_mirror_act_cmds(
const tao_remote_mirror* dm)
{
return address(dm, dm->cmds_offset);
return TAO_REMOTE_MIRROR_ACT_CMDS(dm);
}
tao_remote_mirror* tao_create_remote_mirror(
......@@ -125,13 +130,13 @@ tao_remote_mirror* tao_create_remote_mirror(
}
// Compute sizes and offsets.
size_t inds_offset = TAO_ROUND_UP(sizeof(tao_remote_mirror), sizeof(long));
size_t inds_size = ninds*sizeof(long);
size_t refs_offset = TAO_ROUND_UP(inds_offset + inds_size, sizeof(double));
size_t refs_size = nacts*sizeof(double);
size_t cmds_offset = TAO_ROUND_UP(refs_offset + refs_size, sizeof(double));
size_t cmds_size = nacts*sizeof(double);
size_t size = cmds_offset + cmds_size;
size_t inds_offset = TAO_ROUND_UP(sizeof(tao_remote_mirror), sizeof(long));
size_t refs_offset = TAO_ROUND_UP(inds_offset + inds_size, sizeof(double));
size_t req_cmds_offset = TAO_ROUND_UP(refs_offset + cmds_size, sizeof(double));
size_t act_cmds_offset = TAO_ROUND_UP(req_cmds_offset + cmds_size, sizeof(double));
size_t size = act_cmds_offset + cmds_size;
// Allocate shared memory object.
tao_remote_mirror* dm = (tao_remote_mirror*)tao_create_shared_object(
......@@ -146,12 +151,13 @@ tao_remote_mirror* tao_create_remote_mirror(
dm->mark = 0;
dm->task = TAO_TASK_IDLE;
tao_forced_store(&dm->telemetry, tao_get_shared_mirror_data_shmid(data));
tao_forced_store(&dm->nacts, nacts);
tao_forced_store(&dm->dims[0], dim1);
tao_forced_store(&dm->dims[1], dim2);
tao_forced_store(&dm->inds_offset, inds_offset);
tao_forced_store(&dm->refs_offset, refs_offset);
tao_forced_store(&dm->cmds_offset, cmds_offset);
tao_forced_store(&dm->nacts, nacts);
tao_forced_store(&dm->dims[0], dim1);
tao_forced_store(&dm->dims[1], dim2);
tao_forced_store(&dm->inds_offset, inds_offset);
tao_forced_store(&dm->refs_offset, refs_offset);
tao_forced_store(&dm->req_cmds_offset, req_cmds_offset);
tao_forced_store(&dm->act_cmds_offset, act_cmds_offset);
long* dest_inds = remote_mirror_inds(dm);
for (long i = 0; i < ninds; ++i) {
if (inds[i] >= 0) {
......@@ -290,10 +296,16 @@ const double *tao_get_remote_mirror_reference(
return (dm == NULL) ? NULL : remote_mirror_refs(dm);
}
const double *tao_get_remote_mirror_commands(
const double *tao_get_remote_mirror_requested_commands(
const tao_remote_mirror* dm)
{
return (dm == NULL) ? NULL : remote_mirror_cmds(dm);
return (dm == NULL) ? NULL : remote_mirror_req_cmds(dm);
}
const double *tao_get_remote_mirror_actual_commands(
const tao_remote_mirror* dm)
{
return (dm == NULL) ? NULL : remote_mirror_act_cmds(dm);
}
tao_status tao_quit_remote_mirror(
......@@ -467,7 +479,7 @@ tao_status tao_send_remote_mirror_commands(
}
// Prepare for i/o.
double* dest = remote_mirror_cmds(dm);
double* dest = remote_mirror_req_cmds(dm);
// Wait until server ready for a new task.
tao_status status = wait_deformable_mirror(dm, secs);
......@@ -477,7 +489,7 @@ tao_status tao_send_remote_mirror_commands(
// Copy commands, assign task, notify others, and unlock resources.
memcpy(dest, vals, nvals*sizeof(double));
dm->task = TAO_TASK_APPLY;
dm->task = TAO_TASK_SEND;
dm->mark = mark;
if (serial != NULL) {
*serial = dm->serial + 1;
......@@ -508,8 +520,8 @@ static inline tao_shared_mirror_dataframe_header* fetch_frame(
tao_serial serial)
{
size_t offset = data->offset + ((serial - 1) % data->nbufs)*data->stride;
tao_shared_mirror_dataframe_header* header = address(data, offset);
*refs = address(header, REFS_OFFSET_IN_DATAFRAME);
tao_shared_mirror_dataframe_header* header = TAO_COMPUTED_ADDRESS(data, offset);
*refs = TAO_COMPUTED_ADDRESS(header, REFS_OFFSET_IN_DATAFRAME);
*cmds = (*refs) + data->nacts;
return header;
}
......@@ -782,57 +794,214 @@ tao_status tao_load_shared_mirror_data(
return TAO_OK;
}
tao_status tao_store_shared_mirror_data(
tao_shared_mirror_data* data,
const tao_mirror_data_info* info,
const double* refs,
const double* cmds)
tao_status tao_publish_shared_mirror_data(
tao_remote_mirror* dm,
tao_shared_mirror_data* dat)
{
// Check arguments.
if (data == NULL || info == NULL || refs == NULL || cmds == NULL) {
if (dm == NULL || dat == NULL) {
tao_push_error(__func__, TAO_BAD_ADDRESS);
return TAO_ERROR;
}
long nacts = data->nacts;
if (nacts < 1 || data->nbufs < 2) {
long nacts = dm->nacts;
if (nacts < 1 || dat->nbufs < 2) {
tao_push_error(__func__, TAO_CORRUPTED);
return TAO_ERROR;
}
if (info->nacts != nacts) {
if (dat->nacts != nacts) {
tao_push_error(__func__, TAO_BAD_SIZE);
return TAO_ERROR;
}
tao_serial serial = info->serial;
if (serial < 1) {
if (dm->serial < 0) {
tao_push_error(__func__, TAO_BAD_SERIAL);
return TAO_ERROR;
}
// Increment counter of published frames.
tao_serial serial = ++dm->serial;
// Obtain publication time.
tao_time time;
tao_status status = tao_get_monotonic_time(&time);
if (status != TAO_OK) {
return TAO_ERROR;
}
// Write data-frame header and data. First set the data-frame serial
// number to zero to let others know that data-frame is being overwritten.
// Finally set the data-frame serial number to its value when all contents
// has been updated.
double* refs_dst;
double* cmds_dst;
tao_shared_mirror_dataframe_header* header =
fetch_frame(&refs_dst, &cmds_dst, data, serial);
const double* src_refs = remote_mirror_refs(dm);
const double* src_cmds = remote_mirror_act_cmds(dm);
double* dst_refs;
double* dst_cmds;
tao_shared_mirror_dataframe_header* header = fetch_frame(
&dst_refs, &dst_cmds, dat, serial);
size_t nbytes = nacts*sizeof(double);
atomic_store(&header->serial, 0); // 0 indicates invalid data-frame
header->mark = info->mark;
header->time = info->time;
memcpy(refs_dst, refs, nbytes);
memcpy(cmds_dst, cmds, nbytes);
header->mark = dm->mark;
header->time = time;
memcpy(dst_refs, src_refs, nbytes);
memcpy(dst_cmds, src_cmds, nbytes);
atomic_store(&header->serial, serial); // ≥ 1 indicates valid data-frame
// Set serial number of last data-frame while the shared resources are
// locked and notify others.
tao_status status = lock(data);
status = lock(dat);
if (status == TAO_OK) {
data->serial = serial;
if (broadcast_condition(data) != TAO_OK) {
dat->serial = serial;
if (broadcast_condition(dat) != TAO_OK) {
status = TAO_ERROR;
}
if (unlock(dat) != TAO_OK) {
status = TAO_ERROR;
}
}
return status;
}
tao_status tao_run_mirror_loop(
tao_remote_mirror* dm,
tao_shared_mirror_data* dat,
tao_remote_mirror_operations* ops,
void* ctx)
{
// Check arguments.
if (dm == NULL || dat == NULL || ops == NULL) {
tao_push_error(__func__, TAO_BAD_ADDRESS);
return TAO_ERROR;
}
if (ops->name == NULL) {
tao_push_error(__func__, TAO_BAD_NAME);
return TAO_ERROR;
}
if (dm->nacts < 1) {
tao_push_error(__func__, TAO_BAD_SIZE);
return TAO_ERROR;
}
if (tao_get_remote_mirror_telemetry(dm)
!= tao_get_shared_mirror_data_shmid(dat)) {
tao_push_error(__func__, TAO_BAD_VALUE); // FIXME: TAO_INCOMPATIBLE
return TAO_ERROR;
}
// Lock remote mirror before anyone else has the opportunity to lock it.
if (tao_lock_remote_mirror(dm) != TAO_OK) {
if (ops->debug) {
fprintf(stderr, "%s: failed to lock remote mirror instance\n",
ops->name);
}
return TAO_ERROR;
}
// Publish the shmid's of the shared resources.
if (ops->debug) {
fprintf(stderr, "%s: remote mirror available at shmid=%d\n",
ops->name, (int)tao_get_remote_mirror_shmid(dm));
fprintf(stderr, "%s: mirror telemetry available at shmid=%d\n",
ops->name, (int)tao_get_shared_mirror_data_shmid(dat));
}
tao_status status = tao_config_write_long(
ops->name, tao_get_remote_mirror_shmid(dm));
// Run loop (on entry of the loop we own the lock on the remote mirror
// instance).
bool publish = false;
while (true) {
// Wait for next command.
while (status == TAO_OK && dm->task == TAO_TASK_IDLE) {
status = wait_condition(dm);
}
if (status != TAO_OK) {
break;
}
// Execute command.
if (dm->task == TAO_TASK_RESET) {
if (ops->debug) {
fprintf(stderr, "%s: execute \"reset\" command\n", ops->name);
}
status = ops->on_reset(dm, ctx);
if (status != TAO_OK) {
if (ops->debug) {
fprintf(stderr, "%s: failed to reset deformable mirror\n",
ops->name);
}
break;
}
publish = true;
} else if (dm->task == TAO_TASK_SEND) {
if (ops->debug) {
fprintf(stderr, "%s: execute \"send\" command\n", ops->name);
}
status = ops->on_send(dm, ctx);
if (status != TAO_OK) {
if (ops->debug) {
fprintf(stderr, "%s: failed to send actuators command\n",
ops->name);
}
break;
}
publish = true;
} else if (dm->task == TAO_TASK_QUIT) {
if (ops->debug) {
fprintf(stderr, "%s: execute \"quit\" command\n", ops->name);
}
break;
} else {
if (ops->debug) {
fprintf(stderr, "%s: unknown command received (%d)\n",
ops->name, (int)dm->task);
}
}
// Publish next telemetry frame if requested.
if (publish) {
status = tao_publish_shared_mirror_data(dm, dat);
if (status != TAO_OK) {
if (ops->debug) {
fprintf(stderr, "%s: failed to publish mirror telemetry\n",
ops->name);
}
break;
}
publish = false;
}
// Change task to idle and notify others that the server is ready for a
// new command.
dm->task = TAO_TASK_IDLE;
status = broadcast_condition(dm);
if (status != TAO_OK) {
if (ops->debug) {
fprintf(stderr, "%s: failed to broadcast condition\n",
ops->name);
}
break;
}
}
// The server is no longer running and is unreachable.
dm->task = TAO_TASK_QUIT;
tao_forced_store(&dm->telemetry, TAO_BAD_SHMID);
if (tao_config_write_long(ops->name, TAO_BAD_SHMID) != TAO_OK) {
status = TAO_ERROR;
}
if (broadcast_condition(dm) != TAO_OK) {
status = TAO_ERROR;
}
if (tao_unlock_remote_mirror(dm) != TAO_OK) {
status = TAO_ERROR;
}
if (lock(dat) != TAO_OK) {
status = TAO_ERROR;
dat->running = false; // change value anyway
} else {
dat->running = false;
if (broadcast_condition(dat) != TAO_OK) {
status = TAO_ERROR;
}
if (unlock(data) != TAO_OK) {
if (unlock(dat) != TAO_OK) {
status = TAO_ERROR;
}
}
......
......@@ -12,73 +12,44 @@
#include <sys/types.h>
#include <unistd.h>
#define address(ptr, off) ((void*)(((uint8_t*)(ptr)) + (off)))
// FIXME: When server quit, it must set the running flag to false and notify others.
#define TAO_AS_SHARED_OBJECT_(obj) \
_Generic(obj, \
tao_shared_object *: (obj), \
tao_shared_array *: (tao_shared_object *)(obj), \
tao_shared_camera *: (tao_shared_object *)(obj), \
tao_shared_mirror *: (tao_shared_object *)(obj), \
tao_shared_mirror_data *: (tao_shared_object *)(obj), \
tao_remote_mirror *: (tao_shared_object *)(obj), \
tao_shared_object const*: (obj), \
tao_shared_array const*: (tao_shared_object const*)(obj), \
tao_shared_camera const*: (tao_shared_object const*)(obj), \
tao_shared_mirror const*: (tao_shared_object const*)(obj), \
tao_shared_mirror_data const*: (tao_shared_object const*)(obj), \
tao_remote_mirror const*: (tao_shared_object const*)(obj))
#define TAO_GET_OWNER_(ptr) \
((ptr) == NULL ? "" : TAO_AS_SHARED_OBJECT_(ptr)->owner)
#define TAO_GET_SHMID_(ptr) \
((ptr) == NULL ? TAO_BAD_SHMID : TAO_AS_SHARED_OBJECT_(ptr)->shmid)
#define lock(obj) \
tao_lock_mutex(&(obj)->base.shared.mutex)
#define try_lock(obj) \
tao_try_lock_mutex(&(obj)->base.shared.mutex)
#define timed_lock(obj, secs) \
tao_timed_lock_mutex(&(obj)->base.shared.mutex, secs)
#define abstimed_lock(obj, abstime) \
tao_abstimed_lock_mutex(&(obj)->base.shared.mutex, abstime)
#define unlock(obj) tao_unlock_mutex(&(obj)->base.shared.mutex)
#define wait_condition(obj) \
tao_wait_condition(&(obj)->base.shared.cond, \
&(obj)->base.shared.mutex)
#define timed_wait_condition(obj, secs) \
tao_timed_wait_condition(&(obj)->base.shared.cond, \
&(obj)->base.shared.mutex, \
secs)
#define abstimed_wait_condition(obj, abstime) \
tao_abstimed_wait_condition(&(obj)->base.shared.cond, \
&(obj)->base.shared.mutex, \
abstime)
#define broadcast_condition(obj) \
tao_broadcast_condition(&(obj)->base.shared.cond)
#define signal_condition(obj) \
tao_signal_condition(&(obj)->base.shared.cond)
static char const* ident = NULL;
static char const* progname = NULL;
static bool debug = false;
// Private data needed for the cleanup callback.
static tao_remote_mirror* dm = NULL;
static tao_shared_mirror_data* dat = NULL;
static long* inds = NULL;
static uint8_t* msk = NULL;
// Send the requested command.
static tao_status on_send(
tao_remote_mirror* dm,
void* ctx)
{
// Filter the requested commands to compute the actual commands. Both are
// relative to the reference commands.
double* act_cmds = TAO_REMOTE_MIRROR_ACT_CMDS(dm);
const double* req_cmds = TAO_REMOTE_MIRROR_REQ_CMDS(dm);
const double* refs = TAO_REMOTE_MIRROR_REFS(dm);
long nacts = dm->nacts;
for (long i = 0; i < nacts; ++i) {
double cmd = tao_clamp(req_cmds[i] + refs[i], -1.0, 1.0) - refs[i];
act_cmds[i] = isfinite(cmd) ? cmd : 0.0;
}
return TAO_OK;
}
// Reset the deformable mirror. This is equivalent to send all commands set to
// zero.
static tao_status on_reset(
tao_remote_mirror* dm,
void* ctx)
{
double* req_cmds = TAO_REMOTE_MIRROR_REQ_CMDS(dm);
long nacts = dm->nacts;
for (long i = 0; i < nacts; ++i) {
req_cmds[i] = 0.0;
}
return on_send(dm, ctx);
}
// FIXME: Setting dm->task to TAO_TASK_QUIT, dat->running to false and clearing
// shmid in global configuration should be done automatically for
// server.
......@@ -106,29 +77,6 @@ static void cleanup(void)
free(inds);
inds = NULL;
}
if (ident != NULL) {
// Write invalid shmid.
// FIXME: This is done by tao_remote_object_destroy.
tao_config_write_long(ident, TAO_BAD_SHMID);
}
}
static tao_status store_commands(
tao_shared_mirror_data* data,
const double* refs,
const double* cmds,
tao_serial serial)
{
tao_mirror_data_info info = {
.serial = dm->serial,
.mark = dm->mark,
.nacts = dm->nacts,
};
tao_status status = tao_get_monotonic_time(&info.time);
if (status == TAO_OK) {
status = tao_store_shared_mirror_data(dat, &info, refs, cmds);
}
return status;
}
int main(
......@@ -136,7 +84,7 @@ int main(
char* argv[])
{
// Determine program name.
progname = tao_basename(argv[0]);
const char* progname = tao_basename(argv[0]);
// Install function to free all allocated resources.
if (atexit(cleanup) != 0) {
......@@ -146,6 +94,8 @@ int main(
}
// Parse arguments.
char const* ident = NULL;
bool debug = false;
long nbufs = 10000;
long nacts = 97;
unsigned int orient = 0;
......@@ -321,111 +271,15 @@ int main(
return EXIT_FAILURE;
}
// Lock remote mirror before anyone else has the opportunity to lock it.
if (tao_lock_remote_mirror(dm) != TAO_OK) {
fprintf(stderr, "%s: failed to lock remote mirror instance\n",
progname);
tao_report_errors();
return EXIT_FAILURE;
}
// Publish the shmid's of the shared resources.
if (debug) {
fprintf(stderr, "%s: remote mirror available at shmid=%d\n",
progname, (int)tao_get_remote_mirror_shmid(dm));
fprintf(stderr, "%s: mirror telemetry available at shmid=%d\n",
progname, (int)tao_get_shared_mirror_data_shmid(dat));
}
tao_config_write_long(ident, tao_get_remote_mirror_shmid(dm));
// Run loop (on entry of the loop we own the lock on the remote mirror
// instance).
bool notify = false;
tao_status status = TAO_OK;
double* refs = address(dm, dm->refs_offset);
double* cmds = address(dm, dm->cmds_offset);
double* buf = tao_malloc(nacts*sizeof(double));
if (buf == NULL) {
fprintf(stderr, "%s: failed to allocate buffer of commands\n",
progname);
tao_report_errors();
return EXIT_FAILURE;
}
while (status == TAO_OK && dm->task != TAO_TASK_QUIT) {
while (status == TAO_OK && dm->task == TAO_TASK_IDLE) {
status = wait_condition(dm);
}
if (status != TAO_OK) {
break;
}
if (dm->task == TAO_TASK_RESET) {
// Set commands so that deformable mirror is at its zero level.
if (debug) {
fprintf(stderr, "%s: \"reset\" command received\n", progname);
}
for (long i = 0; i < nacts; ++i) {
buf[i] = -refs[i];
}
++dm->serial;
dm->task = TAO_TASK_IDLE;
notify = true;
status = store_commands(dat, refs, buf, dm->serial);
if (status != TAO_OK) {
fprintf(stderr, "%s: failed to store commands\n", progname);
break;
}
} else if (dm->task == TAO_TASK_APPLY) {
// Set commands so that deformable mirror is at its zero level.
if (debug) {
fprintf(stderr, "%s: \"apply\" command received\n", progname);
}
for (long i = 0; i < nacts; ++i) {
double val = tao_clamp(cmds[i] + refs[i], -1.0, 1.0);
double cmd = val - refs[i];
buf[i] = isfinite(val) ? cmd : 0.0;
}
++dm->serial;
dm->task = TAO_TASK_IDLE;
notify = true;
status = store_commands(dat, refs, buf, dm->serial);
if (status != TAO_OK) {
fprintf(stderr, "%s: failed to store commands\n", progname);
break;
}
} else if (dm->task == TAO_TASK_QUIT) {
if (debug) {
fprintf(stderr, "%s: \"quit\" command received\n", progname);
}
} else {
fprintf(stderr, "%s: unknown command received (%d)\n", progname,
(int)dm->task);
dm->task = TAO_TASK_IDLE;
notify = true;
}
if (notify) {
status = broadcast_condition(dm);
if (status != TAO_OK) {
fprintf(stderr, "%s: failed to broadcast condition\n",
progname);
break;
}
notify = false;
}
}
if (dat->running != false) {
// FIXME: lock and notify
dat->running = false;