214 lines4.3 KB
Newer
Older
-
+
commited
{line.log.rev}
on
8 months ago
37
1
#include "future.h"
2
#include "coroutine.h"
3
#include "task.h"
4
#include <assert.h>
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <time.h>
8
#include <errno.h>
9
#include <math.h>
10
#include <string.h>
11
#include "timespec_utils.h"
12
13
14
Future_vfptrs_t Future_vfptrs = {
15
&Future_dtor,
16
&_Future_Await,
17
&_Future_SetResult
18
};
19
20
typedef struct Future_WatcherSpec {
21
Future_Watcher watcher;
22
void *me;
23
} Future_WatcherSpec;
24
25
void Future_ctor(Future *fut){
26
fut->vfptrs = &Future_vfptrs;
7 months ago
27
_Cor_Mutex_ctor(&fut->mutex);
8 months ago
37
28
fut->state = Future_State_Waiting;
29
fut->value = NULL;
30
fut->canceled = false;
31
fut->watchers = NULL;
32
fut->nwatchers = 0;
33
fut->maxwatchers = 0;
34
}
35
36
37
Future *Future_New(){
38
Future *fut = malloc(sizeof(Future));
39
Future_ctor(fut);
40
return fut;
41
}
42
43
44
void Future_dtor(
45
Future *fut
46
){
7 months ago
47
_Cor_Mutex_Lock(&fut->mutex);
8 months ago
37
48
assert(fut->nwatchers == 0);
49
free(fut->watchers);
7 months ago
50
_Cor_Mutex_Unlock(&fut->mutex);
51
_Cor_Mutex_dtor(&fut->mutex);
8 months ago
37
52
}
53
54
55
void Future_Delete(
56
Future *fut
57
){
58
fut->vfptrs->dtor(fut);
59
free(fut);
60
}
61
62
63
void Future_AddWatcher(
64
Future *fut,
65
Future_Watcher watcher,
66
void *watcher_me
67
){
7 months ago
68
_Cor_Mutex_Lock(&fut->mutex);
8 months ago
37
69
if (fut->state == Future_State_Done) {
7 months ago
70
_Cor_Mutex_Unlock(&fut->mutex);
8 months ago
37
71
watcher(watcher_me, fut);
72
} else {
73
if (fut->nwatchers >= fut->maxwatchers) {
74
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
75
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
76
assert(fut->watchers);
77
}
78
fut->watchers[fut->nwatchers].watcher = watcher;
79
fut->watchers[fut->nwatchers].me = watcher_me;
80
fut->nwatchers++;
7 months ago
81
_Cor_Mutex_Unlock(&fut->mutex);
8 months ago
37
82
}
83
}
84
85
86
void Future_RemoveWatcher(
87
Future *fut,
88
Future_Watcher watcher,
89
void *watcher_me
90
){
7 months ago
91
_Cor_Mutex_Lock(&fut->mutex);
8 months ago
37
92
for (int i = 0; i < fut->nwatchers; i++) {
93
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
94
fut->nwatchers--;
95
if (i < fut->nwatchers) {
96
fut->watchers[i] = fut->watchers[fut->nwatchers];
97
}
98
break;
99
}
100
}
7 months ago
101
_Cor_Mutex_Unlock(&fut->mutex);
8 months ago
37
102
}
103
104
105
static void _Future_Ready(
106
Future *fut
107
){
108
fut->state = Future_State_Done;
109
110
// Take note of watchers list, and reset it
111
Future_WatcherSpec *watchers = fut->watchers;
112
int nwatchers = fut->nwatchers;
113
fut->watchers = NULL;
114
fut->nwatchers = 0;
7 months ago
115
_Cor_Mutex_Unlock(&fut->mutex);
8 months ago
37
116
117
// notify those watchers
118
for (int i = 0; i < nwatchers; i++) {
119
watchers[i].watcher(watchers[i].me, fut);
120
}
121
free(watchers);
122
}
123
124
125
void Future_SetResult(
126
Future *fut,
127
bool canceled,
128
void *value
129
){
130
fut->vfptrs->set_result(fut, canceled, value);
131
}
132
133
134
bool Future_GetResult(
135
Future *fut,
136
void **res
137
){
138
assert(fut->state == Future_State_Done);
139
if (res){
140
*res = fut->value;
141
}
142
return fut->canceled;
143
}
144
145
146
typedef struct future_bits {
147
Future *fut;
148
Coroutine *cor;
149
} future_bits;
150
151
152
static void future_complete(
153
void *me,
154
Future *fut
155
){
156
(void)fut;
157
Coroutine *cor = (Coroutine *)me;
158
Coroutine_Continue(cor, NULL, false);
159
}
160
161
162
static void on_yield_for_future(
163
void *me
164
){
165
future_bits *bits = (future_bits *)me;
166
Future_AddWatcher(bits->fut, future_complete, bits->cor);
167
}
168
169
170
void _Future_Await(
171
Future *fut
172
){
173
future_bits bits;
174
bits.fut = fut;
175
bits.cor = Coroutine_GetActive();
176
Task *my_task = current_task;
177
current_task = NULL;
178
my_task->awaiting_future = fut;
179
if (my_task->canceled){
180
Future_SetResult(fut, true, my_task->cancel_value);
181
}
182
Coroutine_Yield(NULL, on_yield_for_future, &bits);
183
my_task->awaiting_future = NULL;
184
current_task = my_task;
185
}
186
187
188
void _Future_SetResult(
189
Future *fut,
190
bool canceled,
191
void *res
192
){
7 months ago
193
_Cor_Mutex_Lock(&fut->mutex);
8 months ago
37
194
if(fut->state == Future_State_Waiting){
195
fut->canceled = canceled;
196
fut->value = res;
197
_Future_Ready(fut);
198
} else {
7 months ago
199
_Cor_Mutex_Unlock(&fut->mutex);
8 months ago
37
200
}
201
}
202
203
204
bool Future_Await(
205
Future *fut,
206
void **res
207
){
208
fut->vfptrs->await(fut);
209
if (res){
210
*res = fut->value;
211
}
212
return fut->canceled;
213
}
214