1 contributor
523 lines13.5 KB
Newer
Older
-
+
commited
{line.log.rev}
on
8 months ago
4
1
#include "coroutine.h"
2
#include <assert.h>
3
#include <setjmp.h>
4
#include <stdbool.h>
5
#include <stddef.h>
6
#include <stdio.h>
8 months ago
7
#include <pthread.h>
8
#include <stdlib.h>
8 months ago
9
#include "cor_thread_local.h"
8 months ago
4
10
11
12
8 months ago
13
static void *mustmalloc(size_t size){
14
void *p = malloc(size);
15
assert(p);
16
return p;
17
}
18
19
#define New(type, ...) (type##_ctor((type *)mustmalloc(sizeof(type), ## __VA_ARGS__)))
20
#define Delete(ptr, type) ((ptr) ? (type##_dtor(ptr), free(ptr), (ptr) = NULL) : (void)0)
21
8 months ago
4
22
///////////////////////////////////////////////////////////////////////////////
8 months ago
23
// Semaphore built from mutex & condition variables...
24
//
25
// Using pthread.h (more widely available than the C standard library thread.h)
26
///////////////////////////////////////////////////////////////////////////////
27
28
typedef struct Semaphore {
29
pthread_mutex_t mutex;
30
pthread_cond_t cond;
31
int count;
32
} Semaphore;
33
34
static void Semaphore_ctor(Semaphore *sem, int initial_count){
35
sem->count = initial_count;
36
int r = pthread_mutex_init(&sem->mutex, NULL);
37
assert(r == 0);
38
r = pthread_cond_init(&sem->cond, NULL);
39
assert(r == 0);
40
}
41
42
static void Semaphore_dtor(Semaphore *sem){
43
int r = pthread_mutex_destroy(&sem->mutex);
44
assert(r == 0);
45
r = pthread_cond_destroy(&sem->cond);
46
assert(r == 0);
47
}
48
49
static void Semaphore_Claim(Semaphore *sem){
50
int r = pthread_mutex_lock(&sem->mutex);
51
assert(r == 0);
52
while (sem->count <= 0) {
53
r = pthread_cond_wait(&sem->cond, &sem->mutex);
54
assert(r == 0);
55
}
56
sem->count--;
57
r = pthread_mutex_unlock(&sem->mutex);
58
assert(r == 0);
59
}
60
61
static void Semaphore_Release(Semaphore *sem){
62
int r = pthread_mutex_lock(&sem->mutex);
63
assert(r == 0);
64
sem->count++;
65
r = pthread_cond_broadcast(&sem->cond);
66
assert(r == 0);
67
r = pthread_mutex_unlock(&sem->mutex);
68
assert(r == 0);
69
}
70
71
///////////////////////////////////////////////////////////////////////////////
72
// ...semaphore built from mutex
73
///////////////////////////////////////////////////////////////////////////////
74
75
///////////////////////////////////////////////////////////////////////////////
8 months ago
4
76
// 2-way linked lists...
77
//
8 months ago
78
// Brought inline here to avoid namespace polution
8 months ago
4
79
///////////////////////////////////////////////////////////////////////////////
80
81
typedef struct List_Link List_Link;
82
struct List_Link {
83
List_Link *next;
84
List_Link *prev;
85
};
86
87
typedef struct List_Head List_Head;
88
struct List_Head {
89
union {
90
struct {
91
List_Link link;
92
List_Link *filler;
93
} fwd;
94
struct {
95
List_Link *filler;
96
List_Link link;
97
} back;
98
};
99
};
100
101
static inline bool List_IsEmpty(const List_Head *list) {
102
return list->fwd.link.next == &list->back.link;
103
}
104
105
static inline List_Link *List_GetHead(const List_Head *list) {
106
return List_IsEmpty(list) ? NULL : list->fwd.link.next;
107
}
108
static inline List_Link *List_GetTail(const List_Head *list) {
109
return List_IsEmpty(list) ? NULL : list->back.link.prev;
110
}
111
#define OFFSETOF(Container, Field) ((char *)&((Container *)4)->Field - (char *)(Container *)4)
112
#define List_Link_Container(Container, Link, link) ((Container *)((char *)(link) - OFFSETOF(Container, Link)))
113
114
static inline void List_Init(List_Head *list)
115
{
116
list->fwd.link.next = &list->back.link;
117
list->fwd.link.prev = NULL;
118
list->back.link.prev = &list->fwd.link;
119
}
120
121
static inline void List_AddHead(List_Head *list, List_Link *link)
122
{
123
List_Link *first = list->fwd.link.next;
124
link->next = first;
125
link->prev = &list->fwd.link;
126
first->prev = link;
127
list->fwd.link.next = link;
128
}
129
130
static inline void List_AddTail(List_Head *list, List_Link *link)
131
{
132
List_Link *last = list->back.link.prev;
133
link->prev = last;
134
link->next = &list->back.link;
135
last->next = link;
136
list->back.link.prev = link;
137
}
138
139
static inline void List_Remove(List_Link *link)
140
{
141
link->prev->next = link->next;
142
link->next->prev = link->prev;
143
}
144
145
///////////////////////////////////////////////////////////////////////////////
146
// ...2-way linked lists
147
///////////////////////////////////////////////////////////////////////////////
148
8 months ago
149
typedef struct Coroutines Coroutines;
8 months ago
4
150
151
enum {
152
Coroutines_Idle,
153
Coroutines_Starting,
154
Coroutines_Started,
155
Coroutines_Active,
156
Coroutines_Stopping
157
};
158
159
enum {
160
Chunk_Initial,
161
Chunk_Create,
162
Chunk_Enter
163
};
164
165
enum {
166
Coroutine_Constructing,
167
Coroutine_Free,
168
Coroutine_Idle,
169
Coroutine_Running,
170
Coroutine_Waiting,
171
Coroutine_Complete
172
};
173
174
enum {
175
Coroutines_Init,
176
Coroutines_AllocatedChunk,
177
Coroutines_CoroutineComplete,
178
};
179
180
struct Coroutine {
8 months ago
181
Coroutines *coroutines;
8 months ago
4
182
List_Link link;
183
jmp_buf buf;
184
void *this;
185
Coroutine_Start start;
186
void *entry_param;
187
void *value;
188
char state;
189
char action;
190
};
191
192
struct Coroutines {
8 months ago
193
pthread_mutex_t mutex;
8 months ago
4
194
jmp_buf controller;
195
jmp_buf chunk_allocated;
196
197
// singletons
198
Coroutine *tip; // top of stack chunk
199
Coroutine *active; // currently running coroutine
200
Coroutine *primary; // Coroutine_Run coroutine
201
202
// lists
203
List_Head free;
204
List_Head inactive; // idle or complete
205
List_Head runable; // running or waiting to run
206
List_Head waiting; // yielded / waiting to run
8 months ago
207
Semaphore waiting_sem;
8 months ago
4
208
209
// state
210
char state;
211
};
212
8 months ago
213
_Cor_thread_local Coroutines *g_c;
8 months ago
4
214
215
static void stack_chunk_chunk(Coroutine *parent);
216
static void stack_chunk_base(Coroutine *parent);
217
8 months ago
218
8 months ago
4
219
static void Coroutine_PrimeStackChunks()
220
{
221
unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
222
chunk_of_stack[0] = 0xde;
223
chunk_of_stack[1] = 0xad;
224
chunk_of_stack[2] = 0xbe;
225
chunk_of_stack[3] = 0xef;
226
chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
227
chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
228
chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
229
chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
230
stack_chunk_base(NULL);
231
}
232
8 months ago
233
234
static void stack_chunk_chunk(
235
Coroutine *parent
236
){
8 months ago
4
237
unsigned char chunk_of_stack[COROUTINE_STACK_SIZE];
238
chunk_of_stack[0] = 0xde;
239
chunk_of_stack[1] = 0xad;
240
chunk_of_stack[2] = 0xbe;
241
chunk_of_stack[3] = 0xef;
242
chunk_of_stack[COROUTINE_STACK_SIZE - 4] = 0xde;
243
chunk_of_stack[COROUTINE_STACK_SIZE - 3] = 0xad;
244
chunk_of_stack[COROUTINE_STACK_SIZE - 2] = 0xbe;
245
chunk_of_stack[COROUTINE_STACK_SIZE - 1] = 0xef;
246
stack_chunk_base(parent);
247
}
248
8 months ago
249
8 months ago
4
250
static void Coroutine_RunNext()
251
{
8 months ago
252
// arrvie here with mutex unlocked
8 months ago
253
Semaphore_Claim(&g_c->waiting_sem);
254
int r = pthread_mutex_lock(&g_c->mutex);
8 months ago
255
assert(r == 0);
8 months ago
256
assert(!List_IsEmpty(&g_c->runable));
257
Coroutine *next = List_Link_Container(Coroutine, link, List_GetHead(&g_c->runable));
8 months ago
4
258
assert(next->state == Coroutine_Running);
259
longjmp(next->buf, Chunk_Enter);
260
assert(false);
261
}
262
8 months ago
263
264
static void stack_chunk_base(
265
Coroutine *parent
266
){
8 months ago
4
267
Coroutine here;
268
here.state = Coroutine_Constructing;
269
switch (setjmp(here.buf)) {
270
case Chunk_Initial:
271
// got here for the first time
272
// parent now has a chunk_of_stack - add it to the free list
273
if (parent) {
274
assert(parent->state == Coroutine_Constructing);
275
parent->state = Coroutine_Free;
8 months ago
276
List_AddHead(&g_c->free, &parent->link);
8 months ago
4
277
}
278
// note that here is the tip of the chunk-claim stack
8 months ago
279
here.coroutines = g_c;
280
g_c->tip = &here;
8 months ago
4
281
282
// return to the coroutine allocator
8 months ago
283
longjmp(g_c->chunk_allocated, 1);
8 months ago
4
284
case Chunk_Create:
285
// request to create a new chunk on the stack
286
assert(here.state == Coroutine_Constructing);
287
stack_chunk_chunk(&here);
288
assert(false);
289
case Chunk_Enter:
290
// request to start a coroutine (ie use the chunk for a coroutine)
8 months ago
291
// arrive here with mutex locked
8 months ago
4
292
assert(here.state == Coroutine_Running);
8 months ago
293
g_c->active = &here;
294
int r = pthread_mutex_unlock(&g_c->mutex);
8 months ago
295
assert(r == 0);
8 months ago
4
296
here.value = here.start(here.entry_param);
8 months ago
297
r = pthread_mutex_lock(&g_c->mutex);
8 months ago
298
assert(r == 0);
8 months ago
299
g_c->active = NULL;
8 months ago
4
300
assert(here.state == Coroutine_Running);
301
List_Remove(&here.link);
302
here.state = Coroutine_Complete;
8 months ago
303
List_AddTail(&g_c->inactive, &here.link);
8 months ago
4
304
// coroutine has completed
8 months ago
305
if (g_c->primary == &here) {
8 months ago
4
306
// if primary coroutine - return to Coroutine_Run
8 months ago
307
longjmp(g_c->controller, Coroutines_CoroutineComplete);
8 months ago
4
308
}
8 months ago
309
r = pthread_mutex_unlock(&g_c->mutex);
8 months ago
310
assert(r == 0);
8 months ago
4
311
Coroutine_RunNext();
312
assert(false);
313
}
314
}
315
8 months ago
316
8 months ago
4
317
void Coroutine_StartSystem()
318
{
8 months ago
319
assert(!g_c);
320
g_c = mustmalloc(sizeof(Coroutines));
8 months ago
321
8 months ago
322
g_c->state = Coroutines_Starting;
323
8 months ago
324
pthread_mutexattr_t attr;
325
int r = pthread_mutexattr_init(&attr);
326
assert(r == 0);
327
r = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
328
assert(r == 0);
8 months ago
329
r = pthread_mutex_init(&g_c->mutex, &attr);
8 months ago
330
assert(r == 0);
331
r = pthread_mutexattr_destroy(&attr);
332
assert(r == 0);
333
8 months ago
334
g_c->tip = NULL;
335
g_c->active = NULL;
8 months ago
4
336
8 months ago
337
List_Init(&g_c->free);
338
List_Init(&g_c->inactive);
339
List_Init(&g_c->runable);
340
List_Init(&g_c->waiting);
341
Semaphore_ctor(&g_c->waiting_sem, 0);
8 months ago
4
342
343
// prime the chunk system
8 months ago
344
if (!setjmp(g_c->chunk_allocated)){
8 months ago
4
345
Coroutine_PrimeStackChunks();
346
assert(false);
347
}
8 months ago
348
349
assert(g_c->state == Coroutines_Starting);
350
g_c->state = Coroutines_Started;
8 months ago
4
351
}
352
8 months ago
353
8 months ago
4
354
void Coroutine_StopSystem()
355
{
8 months ago
356
int r = pthread_mutex_lock(&g_c->mutex);
8 months ago
357
assert(r == 0);
8 months ago
358
assert(g_c->state == Coroutines_Started);
359
g_c->state = Coroutines_Stopping;
8 months ago
4
360
8 months ago
361
assert(List_IsEmpty(&g_c->inactive));
362
Semaphore_dtor(&g_c->waiting_sem);
8 months ago
4
363
8 months ago
364
assert(g_c->state == Coroutines_Stopping);
365
pthread_mutex_unlock(&g_c->mutex);
8 months ago
366
assert(r == 0);
8 months ago
367
g_c->state = Coroutines_Idle;
368
r = pthread_mutex_destroy(&g_c->mutex);
8 months ago
369
assert(r == 0);
8 months ago
370
free(g_c);
371
g_c = NULL;
8 months ago
4
372
}
373
8 months ago
374
375
void *Coroutine_Run(
376
Coroutine *cor,
377
void *value
378
){
379
Coroutines *cors = cor->coroutines;
380
assert(g_c == cors);
381
int r = pthread_mutex_lock(&cors->mutex);
8 months ago
382
assert(r == 0);
8 months ago
383
assert(cors->state == Coroutines_Started);
384
cors->state = Coroutines_Active;
385
cors->primary = cor;
386
Coroutine_Continue(cors->primary, value, true);
8 months ago
4
387
8 months ago
388
if (!setjmp(cors->controller)){
389
pthread_mutex_unlock(&cors->mutex);
8 months ago
390
assert(r == 0);
8 months ago
4
391
// start the first coroutine
392
Coroutine_RunNext();
393
}
8 months ago
394
// arrive here with mutex locked
8 months ago
395
assert(List_IsEmpty(&cors->runable));
396
assert(List_IsEmpty(&cors->waiting));
397
assert(cors->state == Coroutines_Active);
398
cors->state = Coroutines_Started;
399
pthread_mutex_unlock(&cors->mutex);
8 months ago
400
assert(r == 0);
8 months ago
4
401
return Coroutine_GetValue(cor);
402
}
403
404
8 months ago
405
Coroutine *Coroutine_New(
406
Coroutine_Start start
407
){
408
assert(g_c->state == Coroutines_Started || g_c->state == Coroutines_Active);
409
8 months ago
4
410
// if none free - add one
8 months ago
411
if (List_IsEmpty(&g_c->free)){
412
if (!setjmp(g_c->chunk_allocated)){
413
longjmp(g_c->tip->buf, Chunk_Create);
8 months ago
4
414
}
415
}
416
8 months ago
417
Coroutine *cor = List_Link_Container(Coroutine, link, List_GetHead(&g_c->free));
8 months ago
4
418
assert(cor->state == Coroutine_Free);
419
cor->state = Coroutine_Idle;
420
cor->start = start;
421
cor->value = NULL;
422
List_Remove(&cor->link);
8 months ago
423
List_AddHead(&g_c->inactive, &cor->link);
8 months ago
4
424
425
return cor;
426
}
427
8 months ago
428
429
void Coroutine_Delete(
430
Coroutine *cor
431
){
432
Coroutines *cors = cor->coroutines;
433
int r = pthread_mutex_lock(&cors->mutex);
434
assert(r == 0);
8 months ago
4
435
assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Complete);
436
cor->state = Coroutine_Free;
437
List_Remove(&cor->link);
8 months ago
438
List_AddTail(&cors->free, &cor->link);
439
r = pthread_mutex_unlock(&cors->mutex);
440
assert(r == 0);
8 months ago
4
441
}
442
8 months ago
443
444
void Coroutine_Continue(
445
Coroutine *cor,
446
void *value,
447
bool early
448
){
449
Coroutines *cors = cor->coroutines;
450
int r = pthread_mutex_lock(&cors->mutex);
8 months ago
451
assert(r == 0);
8 months ago
4
452
assert(cor->state == Coroutine_Idle || cor->state == Coroutine_Waiting);
453
cor->entry_param = value;
454
cor->state = Coroutine_Running;
455
List_Remove(&cor->link);
456
if ( early ) {
8 months ago
457
List_AddHead(&cors->runable, &cor->link);
8 months ago
4
458
} else {
8 months ago
459
List_AddTail(&cors->runable, &cor->link);
8 months ago
4
460
}
8 months ago
461
r = pthread_mutex_unlock(&cors->mutex);
8 months ago
462
assert(r == 0);
8 months ago
463
Semaphore_Release(&cors->waiting_sem);
8 months ago
4
464
}
465
8 months ago
466
467
void *Coroutine_Yield(
468
void *value,
469
Coroutine_YieldCallback on_yield,
470
void *this
471
){
472
int r = pthread_mutex_lock(&g_c->mutex);
8 months ago
473
assert(r == 0);
8 months ago
474
Coroutine *me = g_c->active;
475
Coroutines *cors = me->coroutines;
476
assert(me && me->state == Coroutine_Running && cors == g_c);
8 months ago
4
477
me->value = value;
478
me->state = Coroutine_Waiting;
479
List_Remove(&me->link);
8 months ago
480
List_AddTail(&cors->waiting, &me->link);
8 months ago
481
switch (setjmp(me->buf)){
482
case Chunk_Initial:
8 months ago
483
r = pthread_mutex_unlock(&cors->mutex);
8 months ago
484
assert(r == 0);
8 months ago
485
on_yield(this);
8 months ago
4
486
Coroutine_RunNext();
8 months ago
487
case Chunk_Create:
488
assert(false);
489
case Chunk_Enter:
490
// arrive here with mutex locked
8 months ago
491
cors->active = me;
8 months ago
492
// when we return here - we are running again
493
assert(me->state == Coroutine_Running);
494
void *res = me->entry_param;
8 months ago
495
r = pthread_mutex_unlock(&cors->mutex);
8 months ago
496
assert(r == 0);
497
return res;
8 months ago
4
498
}
8 months ago
499
return NULL;
8 months ago
4
500
}
501
8 months ago
502
503
void *Coroutine_GetValue(
504
Coroutine *cor
505
){
8 months ago
4
506
return cor->value;
507
}
508
8 months ago
509
8 months ago
4
510
Coroutine *Coroutine_GetActive()
511
{
8 months ago
512
return g_c->active;
8 months ago
4
513
}
514
8 months ago
515
516
bool Coroutine_IsRunning(
517
Coroutine *cor
518
)
8 months ago
4
519
{
8 months ago
520
int state = cor->state;
521
return state == Coroutine_Running || state == Coroutine_Waiting;
8 months ago
4
522
}
523