216 lines4.4 KB
Newer
Older
-
+
commited
{line.log.rev}
on
9 months ago
37
1
#include "future.h"
2
#include "coroutine.h"
3
#include "task.h"
4
#include <assert.h>
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <time.h>
8
#include <errno.h>
9
#include <math.h>
10
#include <string.h>
11
#include "timespec_utils.h"
12
13
14
Future_vfptrs_t Future_vfptrs = {
15
&Future_dtor,
16
&_Future_Await,
17
&_Future_SetResult
18
};
19
20
typedef struct Future_WatcherSpec {
21
Future_Watcher watcher;
22
void *me;
23
} Future_WatcherSpec;
24
25
void Future_ctor(Future *fut){
26
fut->vfptrs = &Future_vfptrs;
8 months ago
27
_Cor_Mutex_ctor(&fut->mutex);
9 months ago
37
28
fut->state = Future_State_Waiting;
29
fut->value = NULL;
30
fut->canceled = false;
31
fut->watchers = NULL;
32
fut->nwatchers = 0;
33
fut->maxwatchers = 0;
34
}
35
36
37
Future *Future_New(){
38
Future *fut = malloc(sizeof(Future));
39
Future_ctor(fut);
40
return fut;
41
}
42
43
44
void Future_dtor(
45
Future *fut
46
){
8 months ago
47
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
48
assert(fut->nwatchers == 0);
49
free(fut->watchers);
8 months ago
50
_Cor_Mutex_Unlock(&fut->mutex);
51
_Cor_Mutex_dtor(&fut->mutex);
9 months ago
37
52
}
53
54
55
void Future_Delete(
56
Future *fut
57
){
58
fut->vfptrs->dtor(fut);
59
free(fut);
60
}
61
62
63
void Future_AddWatcher(
64
Future *fut,
65
Future_Watcher watcher,
66
void *watcher_me
67
){
8 months ago
68
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
69
if (fut->state == Future_State_Done) {
8 months ago
70
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
71
watcher(watcher_me, fut);
72
} else {
73
if (fut->nwatchers >= fut->maxwatchers) {
74
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
75
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
76
assert(fut->watchers);
77
}
78
fut->watchers[fut->nwatchers].watcher = watcher;
79
fut->watchers[fut->nwatchers].me = watcher_me;
80
fut->nwatchers++;
8 months ago
81
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
82
}
83
}
84
85
86
void Future_RemoveWatcher(
87
Future *fut,
88
Future_Watcher watcher,
89
void *watcher_me
90
){
8 months ago
91
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
92
for (int i = 0; i < fut->nwatchers; i++) {
93
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
94
fut->nwatchers--;
95
if (i < fut->nwatchers) {
96
fut->watchers[i] = fut->watchers[fut->nwatchers];
97
}
98
break;
99
}
100
}
8 months ago
101
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
102
}
103
104
105
static void _Future_Ready(
106
Future *fut
107
){
108
fut->state = Future_State_Done;
109
110
// Take note of watchers list, and reset it
111
Future_WatcherSpec *watchers = fut->watchers;
112
int nwatchers = fut->nwatchers;
113
fut->watchers = NULL;
114
fut->nwatchers = 0;
8 months ago
115
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
116
117
// notify those watchers
118
for (int i = 0; i < nwatchers; i++) {
119
watchers[i].watcher(watchers[i].me, fut);
120
}
121
free(watchers);
122
}
123
124
125
void Future_SetResult(
126
Future *fut,
127
bool canceled,
128
void *value
129
){
130
fut->vfptrs->set_result(fut, canceled, value);
131
}
132
133
4 months ago
134
Coroutine_Err Future_GetResult(
9 months ago
37
135
Future *fut,
136
void **res
137
){
4 months ago
138
if(fut->state != Future_State_Done){
139
return Coroutine_Err_WrongState;
140
}
9 months ago
37
141
if (res){
142
*res = fut->value;
143
}
4 months ago
144
return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
9 months ago
37
145
}
146
147
148
typedef struct future_bits {
149
Future *fut;
150
Coroutine *cor;
151
} future_bits;
152
153
154
static void future_complete(
155
void *me,
156
Future *fut
157
){
158
(void)fut;
159
Coroutine *cor = (Coroutine *)me;
160
Coroutine_Continue(cor, NULL, false);
161
}
162
163
164
static void on_yield_for_future(
165
void *me
166
){
167
future_bits *bits = (future_bits *)me;
168
Future_AddWatcher(bits->fut, future_complete, bits->cor);
169
}
170
171
172
void _Future_Await(
173
Future *fut
174
){
175
future_bits bits;
176
bits.fut = fut;
177
bits.cor = Coroutine_GetActive();
178
Task *my_task = current_task;
179
current_task = NULL;
180
my_task->awaiting_future = fut;
181
if (my_task->canceled){
182
Future_SetResult(fut, true, my_task->cancel_value);
183
}
184
Coroutine_Yield(NULL, on_yield_for_future, &bits);
185
my_task->awaiting_future = NULL;
186
current_task = my_task;
187
}
188
189
190
void _Future_SetResult(
191
Future *fut,
192
bool canceled,
193
void *res
194
){
8 months ago
195
_Cor_Mutex_Lock(&fut->mutex);
9 months ago
37
196
if(fut->state == Future_State_Waiting){
197
fut->canceled = canceled;
198
fut->value = res;
199
_Future_Ready(fut);
200
} else {
8 months ago
201
_Cor_Mutex_Unlock(&fut->mutex);
9 months ago
37
202
}
203
}
204
205
4 months ago
206
Coroutine_Err Future_Await(
9 months ago
37
207
Future *fut,
208
void **res
209
){
210
fut->vfptrs->await(fut);
211
if (res){
212
*res = fut->value;
213
}
4 months ago
214
return fut->canceled ? Coroutine_Err_Canceled : Coroutine_OK;
9 months ago
37
215
}
216