remote-objects.c 10.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// remote-objects.c -
//
// Management of basic shared objects used to communicate with remote server.
//
//-----------------------------------------------------------------------------
//
// This file if part of TAO real-time software licensed under the MIT license
// (https://git-cral.univ-lyon1.fr/tao/tao-rt).
//
// Copyright (C) 2018-2022, Éric Thiébaut.

#include <string.h>

#include "tao-basics.h"
#include "tao-errors.h"
#include "tao-macros.h"
#include "tao-generic.h"
#include "tao-shared-objects.h"
#include "tao-remote-objects-private.h"

const char* tao_state_get_name(
    tao_state state)
{
    switch (state) {
    case TAO_STATE_INITIALIZING:
        return "initializing";
    case TAO_STATE_WAITING:
        return "waiting";
    case TAO_STATE_STARTING:
        return "starting";
    case TAO_STATE_WORKING:
        return "working";
    case TAO_STATE_STOPPING:
        return "stopping";
    case TAO_STATE_ABORTING:
        return "aborting";
    case TAO_STATE_KILLED:
        return "killed";
    default:
        return "unknown";
    }
}

tao_remote_object* tao_remote_object_create(
    const char* owner,
    tao_object_type type,
    long nbufs,
    long offset,
    long stride,
    size_t size,
    unsigned flags)
{
    long len = TAO_STRLEN(owner);
    if (len < 1 || len >= TAO_OWNER_SIZE) {
        tao_store_error(__func__, TAO_BAD_NAME);
        return NULL;
    }
    if ((type & TAO_SHARED_SUPERTYPE_MASK) != TAO_REMOTE_OBJECT) {
        tao_store_error(__func__, TAO_BAD_TYPE);
        return NULL;
    }
    if (nbufs < 0 || offset < sizeof(tao_remote_object) || stride < 0 ||
        size < offset + nbufs*stride) {
        tao_store_error(__func__, TAO_BAD_SIZE);
        return NULL;
    }
    tao_remote_object* obj = (tao_remote_object*)tao_shared_object_create(
        type, size, flags);
    if (obj == NULL) {
        return NULL;
    }
    for (long i = 0; i < len; ++i) {
        ((char*)obj->owner)[i] = owner[i];
    }
    ((char*)obj->owner)[len] = '\0';
    tao_forced_store(&obj->nbufs,  nbufs);
    tao_forced_store(&obj->offset, offset);
    tao_forced_store(&obj->stride, stride);
    obj->serial = 0;
    obj->state = TAO_STATE_INITIALIZING;
    obj->command = TAO_COMMAND_NONE;
    return obj;
}

tao_remote_object* tao_remote_object_attach(
    tao_shmid shmid)
{
    tao_shared_object* obj = tao_shared_object_attach(shmid);
    if (obj != NULL &&
        (obj->type & TAO_SHARED_SUPERTYPE_MASK) == TAO_REMOTE_OBJECT) {
        return (tao_remote_object*)obj;
    }
    tao_shared_object_detach(obj);
    tao_store_error(__func__, TAO_BAD_TYPE);
    return NULL;
}

// Basic methods.
#define TYPE remote_object
#define IS_REMOTE_OBJECT 1
#include "./shared-methods.c"

// Check whether the server owning the remote object is alive.
//
// NOTE: Since server state is an *atomic* variable, the caller may not have
// locked the object.
static inline bool is_alive(
    const tao_remote_object* obj)
{
    return (obj->state != TAO_STATE_KILLED);
}

// Inline function to check whether a client should wait for the remote server
// to be ready to accept commands.
//
// NOTE: Since communication is asynchronous, we can assume that a server may
//       receive at most one command while initializing.
static inline bool wait_for_command(
    tao_status status,
    const tao_remote_object* obj)
{
    return (status == TAO_OK && is_alive(obj)
            && obj->command != TAO_COMMAND_NONE);
}

// Inline function to check whether a client should wait for the requested
// frame to be available.
static inline bool wait_for_serial(
    tao_status status,
    const tao_remote_object* obj,
    tao_serial serial)
{
    return (status == TAO_OK && is_alive(obj)
            && obj->serial < serial);
}

tao_serial tao_remote_object_wait_serial(
    tao_remote_object* obj,
    tao_serial         serial,
    double             secs)
{
    // Check argument(s) and figure out which buffer to wait for.
    if (obj == NULL) {
        tao_store_error(__func__, TAO_BAD_ADDRESS);
        goto error;
    }
    tao_serial last = obj->serial; // NOTE: ok because atomic
    if (last < 0) {
        tao_store_error(__func__, TAO_CORRUPTED);
        goto error;
    }
    if (serial <= 0) {
        // Manage to wait for next buffer.
        serial = last + 1;
    }
    tao_status status = TAO_OK;
    if (serial > last) {
        // Convert seconds to absolute time, then wait for next buffers until a
        // sufficient serial number is reached or the time limit is exhausted.
        tao_time abstime;
        bool locked = false;
        switch (tao_get_absolute_timeout(&abstime, secs)) {
        case TAO_TIMEOUT_NEVER:
            status = tao_remote_object_lock(obj);
            locked = (status == TAO_OK);
            while (wait_for_serial(status, obj, serial)) {
                status = tao_remote_object_wait_condition(obj);
            }
            break;
        case TAO_TIMEOUT_FUTURE:
            status = tao_remote_object_abstimed_lock(obj, &abstime);
            locked = (status == TAO_OK);
            while (wait_for_serial(status, obj, serial)) {
                status = tao_remote_object_abstimed_wait_condition(
                    obj, &abstime);
            }
            break;
        case TAO_TIMEOUT_NOW:
            status = tao_remote_object_try_lock(obj);
            locked = (status == TAO_OK);
            if (wait_for_serial(status, obj, serial)) {
                status = TAO_TIMEOUT;
            }
            break;
        case TAO_TIMEOUT_PAST:
            // Return a value indicating a timeout.
            status = TAO_TIMEOUT;
            break;
        default:
            // The only remaining possibility is that
            // `tao_get_absolute_timeout` returned an error
            // (TAO_TIMEOUT_ERROR).
            status = TAO_ERROR;
            break;
        }
        // Unlock ressources.
        if (locked && tao_remote_object_unlock(obj) != TAO_OK) {
            status = TAO_ERROR;
        }
    }

    // If no problems occurred, the result is the serial number of the
    // requested buffer; otherwise, it is a nonpositive number indicating the
    // problem.  The serial number of the last output buffer is loaded again to
    // maximize the odds of success of the operation.
    if (status != TAO_ERROR) {
        last = obj->serial; // NOTE: ok because atomic
        if (last < 0) {
            tao_store_error(__func__, TAO_CORRUPTED);
            goto error;
        }
        if (serial <= last) {
            if (serial > last - obj->nbufs) {
                // The requested buffer is available in the cyclic list of
                // buffers.
                return serial;
            } else {
                // The requested buffer is too old.
                return -1;
            }
        } else if (is_alive(obj)) {
            // A timeout occurred.
            return 0;
        } else {
            // The requested buffer will never be available because the server
            // is no longer alive.
            return -2;
        }
    }

Éric Thiébaut's avatar
Éric Thiébaut committed
231
    // An error has occurred.
232
233
234
235
236
237
error:
    return -3;
}

tao_status tao_remote_object_lock_for_command(
    tao_remote_object* obj,
Éric Thiébaut's avatar
Éric Thiébaut committed
238
    tao_command command,
239
240
241
    double secs)
{
    // Minimal check.
Éric Thiébaut's avatar
Éric Thiébaut committed
242
    if (obj == NULL) {
243
244
245
246
        tao_store_error(__func__, TAO_BAD_ADDRESS);
        return TAO_ERROR;
    }

Éric Thiébaut's avatar
Éric Thiébaut committed
247
248
    // Convert seconds to absolute time, then lock resources and wait until the
    // server is ready for a new command, killed, or some error occurs.
249
250
    tao_status status;
    tao_time abstime;
Éric Thiébaut's avatar
Éric Thiébaut committed
251
    bool locked;
252
253
254
    switch (tao_get_absolute_timeout(&abstime, secs)) {
    case TAO_TIMEOUT_NEVER:
        status = tao_remote_object_lock(obj);
Éric Thiébaut's avatar
Éric Thiébaut committed
255
        locked = (status == TAO_OK);
256
257
258
259
260
261
        while (wait_for_command(status, obj)) {
            status = tao_remote_object_wait_condition(obj);
        }
        break;
    case TAO_TIMEOUT_FUTURE:
        status = tao_remote_object_abstimed_lock(obj, &abstime);
Éric Thiébaut's avatar
Éric Thiébaut committed
262
        locked = (status == TAO_OK);
263
264
265
266
267
268
        while (wait_for_command(status, obj)) {
            status = tao_remote_object_abstimed_wait_condition(obj, &abstime);
        }
        break;
    case TAO_TIMEOUT_NOW:
        status = tao_remote_object_try_lock(obj);
Éric Thiébaut's avatar
Éric Thiébaut committed
269
        locked = (status == TAO_OK);
270
271
272
273
274
        if (wait_for_command(status, obj)) {
            status = TAO_TIMEOUT;
        }
        break;
    case TAO_TIMEOUT_PAST:
Éric Thiébaut's avatar
Éric Thiébaut committed
275
        locked = false;
276
277
278
        status = TAO_TIMEOUT;
        break;
    default: // TAO_TIMEOUT_ERROR
Éric Thiébaut's avatar
Éric Thiébaut committed
279
        locked = false;
280
281
282
        status = TAO_ERROR;
        break;
    }
Éric Thiébaut's avatar
Éric Thiébaut committed
283
    if (status == TAO_OK) {
Éric Thiébaut's avatar
Éric Thiébaut committed
284
        // Server is either dead or idle.
Éric Thiébaut's avatar
Éric Thiébaut committed
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
        if (obj->state == TAO_STATE_KILLED) {
            // Server has been killed.
            if (command != TAO_COMMAND_KILL) {
#if TAO_ASSUME_TIMOUT_IF_SERVER_KILLED
                status = TAO_TIMEOUT;
#else
                tao_store_error(func, TAO_NOT_RUNNING);
                status = TAO_ERROR;
#endif
            }
        } else if (obj->command == TAO_COMMAND_NONE) {
            // Server is ready to accept a new command.
            if (command != TAO_COMMAND_NONE) {
                obj->command = command;
                if (tao_remote_object_broadcast_condition(obj) != TAO_OK) {
                    obj->command = TAO_COMMAND_NONE;
                    status = TAO_ERROR;
                }
            }
        } else  {
            // Unexpected result.
            tao_store_error(__func__, TAO_ASSERTION_FAILED);
            status = TAO_ERROR;
        }
    }

    // In case of error, if object has been locked, unlock it.
    if (status != TAO_OK && locked && tao_remote_object_unlock(obj) != TAO_OK) {
        status = TAO_ERROR;
    }

316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
    return status;
}

tao_status tao_remote_object_send_simple_command(
    tao_remote_object* obj,
    tao_command command,
    double secs)
{
    if (obj == NULL) {
        tao_store_error(__func__, TAO_BAD_ADDRESS);
        return TAO_ERROR;
    }
    if (command == TAO_COMMAND_KILL) {
        if (! is_alive(obj)) {
            // Assume success for a "kill" command when the server is no longer
            // alive.
            return TAO_OK;
        }
    }
Éric Thiébaut's avatar
Éric Thiébaut committed
335
    tao_status status = tao_remote_object_lock_for_command(obj, command, secs);
336
    if (status == TAO_OK) {
Éric Thiébaut's avatar
Éric Thiébaut committed
337
338
        // Command is about to be sent, nothing else to do than unlock.
        if (tao_remote_object_unlock(obj) != TAO_OK) {
339
340
341
342
343
            status = TAO_ERROR;
        }
    }
    if (command == TAO_COMMAND_KILL) {
        // Assume success upon timout if the server is no longer alive.
Éric Thiébaut's avatar
Éric Thiébaut committed
344
        if (status == TAO_TIMEOUT && ! is_alive(obj)) {
345
346
347
348
349
350
351
352
353
354
355
356
357
            status = TAO_OK;
        }
    }
    return status;
}

tao_status tao_remote_object_kill(
    tao_remote_object* obj,
    double secs)
{
    // NOTE: Checking of arguments is done by called functions.
    return tao_remote_object_send_simple_command(obj, TAO_COMMAND_KILL, secs);
}