1 contributor
231 lines4.6 KB
Newer
Older
-
+
commited
{line.log.rev}
on
9 months ago
37
1
#include "future.h"
2
#include "coroutine.h"
3
#include "task.h"
4
#include <assert.h>
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <time.h>
8
#include <errno.h>
9
#include <math.h>
10
#include <string.h>
11
#include "timespec_utils.h"
12
13
14
Future_vfptrs_t Future_vfptrs = {
15
&Future_dtor,
16
&_Future_Await,
17
&_Future_SetResult
18
};
19
20
typedef struct Future_WatcherSpec {
21
Future_Watcher watcher;
22
void *me;
23
} Future_WatcherSpec;
24
25
void Future_ctor(Future *fut){
26
fut->vfptrs = &Future_vfptrs;
27
int r = pthread_mutex_init(&fut->mutex, NULL);
28
assert(r == 0);
29
fut->state = Future_State_Waiting;
30
fut->value = NULL;
31
fut->canceled = false;
32
fut->watchers = NULL;
33
fut->nwatchers = 0;
34
fut->maxwatchers = 0;
35
}
36
37
38
Future *Future_New(){
39
Future *fut = malloc(sizeof(Future));
40
Future_ctor(fut);
41
return fut;
42
}
43
44
45
void Future_dtor(
46
Future *fut
47
){
48
int r;
49
50
r = pthread_mutex_lock(&fut->mutex);
51
assert(r == 0);
52
assert(fut->nwatchers == 0);
53
free(fut->watchers);
54
r = pthread_mutex_unlock(&fut->mutex);
55
assert(r == 0);
56
57
r = pthread_mutex_destroy(&fut->mutex);
58
assert(r == 0);
59
}
60
61
62
void Future_Delete(
63
Future *fut
64
){
65
fut->vfptrs->dtor(fut);
66
free(fut);
67
}
68
69
70
void Future_AddWatcher(
71
Future *fut,
72
Future_Watcher watcher,
73
void *watcher_me
74
){
75
int r = pthread_mutex_lock(&fut->mutex);
76
assert(r == 0);
77
if (fut->state == Future_State_Done) {
78
r = pthread_mutex_unlock(&fut->mutex);
79
assert(r == 0);
80
watcher(watcher_me, fut);
81
} else {
82
if (fut->nwatchers >= fut->maxwatchers) {
83
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
84
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
85
assert(fut->watchers);
86
}
87
fut->watchers[fut->nwatchers].watcher = watcher;
88
fut->watchers[fut->nwatchers].me = watcher_me;
89
fut->nwatchers++;
90
r = pthread_mutex_unlock(&fut->mutex);
91
assert(r == 0);
92
}
93
}
94
95
96
void Future_RemoveWatcher(
97
Future *fut,
98
Future_Watcher watcher,
99
void *watcher_me
100
){
101
int r = pthread_mutex_lock(&fut->mutex);
102
assert(r == 0);
103
for (int i = 0; i < fut->nwatchers; i++) {
104
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
105
fut->nwatchers--;
106
if (i < fut->nwatchers) {
107
fut->watchers[i] = fut->watchers[fut->nwatchers];
108
}
109
break;
110
}
111
}
112
r = pthread_mutex_unlock(&fut->mutex);
113
assert(r == 0);
114
}
115
116
117
static void _Future_Ready(
118
Future *fut
119
){
120
int r;
121
fut->state = Future_State_Done;
122
123
// Take note of watchers list, and reset it
124
Future_WatcherSpec *watchers = fut->watchers;
125
int nwatchers = fut->nwatchers;
126
fut->watchers = NULL;
127
fut->nwatchers = 0;
128
r = pthread_mutex_unlock(&fut->mutex);
129
assert(r == 0);
130
131
// notify those watchers
132
for (int i = 0; i < nwatchers; i++) {
133
watchers[i].watcher(watchers[i].me, fut);
134
}
135
free(watchers);
136
}
137
138
139
void Future_SetResult(
140
Future *fut,
141
bool canceled,
142
void *value
143
){
144
fut->vfptrs->set_result(fut, canceled, value);
145
}
146
147
148
bool Future_GetResult(
149
Future *fut,
150
void **res
151
){
152
assert(fut->state == Future_State_Done);
153
if (res){
154
*res = fut->value;
155
}
156
return fut->canceled;
157
}
158
159
160
typedef struct future_bits {
161
Future *fut;
162
Coroutine *cor;
163
} future_bits;
164
165
166
static void future_complete(
167
void *me,
168
Future *fut
169
){
170
(void)fut;
171
Coroutine *cor = (Coroutine *)me;
172
Coroutine_Continue(cor, NULL, false);
173
}
174
175
176
static void on_yield_for_future(
177
void *me
178
){
179
future_bits *bits = (future_bits *)me;
180
Future_AddWatcher(bits->fut, future_complete, bits->cor);
181
}
182
183
184
void _Future_Await(
185
Future *fut
186
){
187
future_bits bits;
188
bits.fut = fut;
189
bits.cor = Coroutine_GetActive();
190
Task *my_task = current_task;
191
current_task = NULL;
192
my_task->awaiting_future = fut;
193
if (my_task->canceled){
194
Future_SetResult(fut, true, my_task->cancel_value);
195
}
196
Coroutine_Yield(NULL, on_yield_for_future, &bits);
197
my_task->awaiting_future = NULL;
198
current_task = my_task;
199
}
200
201
202
void _Future_SetResult(
203
Future *fut,
204
bool canceled,
205
void *res
206
){
207
int r;
208
r = pthread_mutex_lock(&fut->mutex);
209
assert(r == 0);
210
if(fut->state == Future_State_Waiting){
211
fut->canceled = canceled;
212
fut->value = res;
213
_Future_Ready(fut);
214
} else {
215
r = pthread_mutex_unlock(&fut->mutex);
216
assert(r == 0);
217
}
218
}
219
220
221
bool Future_Await(
222
Future *fut,
223
void **res
224
){
225
fut->vfptrs->await(fut);
226
if (res){
227
*res = fut->value;
228
}
229
return fut->canceled;
230
}
231