214 lines4.3 KB
1#include "future.h"
2#include "coroutine.h"
3#include "task.h"
4#include <assert.h>
5#include <stdio.h>
6#include <stdlib.h>
7#include <time.h>
8#include <errno.h>
9#include <math.h>
10#include <string.h>
11#include "timespec_utils.h"
12
13
14Future_vfptrs_t Future_vfptrs = {
15 &Future_dtor,
16 &_Future_Await,
17 &_Future_SetResult
18};
19
20typedef struct Future_WatcherSpec {
21 Future_Watcher watcher;
22 void *me;
23} Future_WatcherSpec;
24
25void Future_ctor(Future *fut){
26 fut->vfptrs = &Future_vfptrs;
27 _Cor_Mutex_ctor(&fut->mutex);
28 fut->state = Future_State_Waiting;
29 fut->value = NULL;
30 fut->canceled = false;
31 fut->watchers = NULL;
32 fut->nwatchers = 0;
33 fut->maxwatchers = 0;
34}
35
36
37Future *Future_New(){
38 Future *fut = malloc(sizeof(Future));
39 Future_ctor(fut);
40 return fut;
41}
42
43
44void Future_dtor(
45 Future *fut
46){
47 _Cor_Mutex_Lock(&fut->mutex);
48 assert(fut->nwatchers == 0);
49 free(fut->watchers);
50 _Cor_Mutex_Unlock(&fut->mutex);
51 _Cor_Mutex_dtor(&fut->mutex);
52}
53
54
55void Future_Delete(
56 Future *fut
57){
58 fut->vfptrs->dtor(fut);
59 free(fut);
60}
61
62
63void Future_AddWatcher(
64 Future *fut,
65 Future_Watcher watcher,
66 void *watcher_me
67){
68 _Cor_Mutex_Lock(&fut->mutex);
69 if (fut->state == Future_State_Done) {
70 _Cor_Mutex_Unlock(&fut->mutex);
71 watcher(watcher_me, fut);
72 } else {
73 if (fut->nwatchers >= fut->maxwatchers) {
74 fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
75 fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
76 assert(fut->watchers);
77 }
78 fut->watchers[fut->nwatchers].watcher = watcher;
79 fut->watchers[fut->nwatchers].me = watcher_me;
80 fut->nwatchers++;
81 _Cor_Mutex_Unlock(&fut->mutex);
82 }
83}
84
85
86void Future_RemoveWatcher(
87 Future *fut,
88 Future_Watcher watcher,
89 void *watcher_me
90){
91 _Cor_Mutex_Lock(&fut->mutex);
92 for (int i = 0; i < fut->nwatchers; i++) {
93 if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
94 fut->nwatchers--;
95 if (i < fut->nwatchers) {
96 fut->watchers[i] = fut->watchers[fut->nwatchers];
97 }
98 break;
99 }
100 }
101 _Cor_Mutex_Unlock(&fut->mutex);
102}
103
104
105static void _Future_Ready(
106 Future *fut
107){
108 fut->state = Future_State_Done;
109
110 // Take note of watchers list, and reset it
111 Future_WatcherSpec *watchers = fut->watchers;
112 int nwatchers = fut->nwatchers;
113 fut->watchers = NULL;
114 fut->nwatchers = 0;
115 _Cor_Mutex_Unlock(&fut->mutex);
116
117 // notify those watchers
118 for (int i = 0; i < nwatchers; i++) {
119 watchers[i].watcher(watchers[i].me, fut);
120 }
121 free(watchers);
122}
123
124
125void Future_SetResult(
126 Future *fut,
127 bool canceled,
128 void *value
129){
130 fut->vfptrs->set_result(fut, canceled, value);
131}
132
133
134bool Future_GetResult(
135 Future *fut,
136 void **res
137){
138 assert(fut->state == Future_State_Done);
139 if (res){
140 *res = fut->value;
141 }
142 return fut->canceled;
143}
144
145
146typedef struct future_bits {
147 Future *fut;
148 Coroutine *cor;
149} future_bits;
150
151
152static void future_complete(
153 void *me,
154 Future *fut
155){
156 (void)fut;
157 Coroutine *cor = (Coroutine *)me;
158 Coroutine_Continue(cor, NULL, false);
159}
160
161
162static void on_yield_for_future(
163 void *me
164){
165 future_bits *bits = (future_bits *)me;
166 Future_AddWatcher(bits->fut, future_complete, bits->cor);
167}
168
169
170void _Future_Await(
171 Future *fut
172){
173 future_bits bits;
174 bits.fut = fut;
175 bits.cor = Coroutine_GetActive();
176 Task *my_task = current_task;
177 current_task = NULL;
178 my_task->awaiting_future = fut;
179 if (my_task->canceled){
180 Future_SetResult(fut, true, my_task->cancel_value);
181 }
182 Coroutine_Yield(NULL, on_yield_for_future, &bits);
183 my_task->awaiting_future = NULL;
184 current_task = my_task;
185}
186
187
188void _Future_SetResult(
189 Future *fut,
190 bool canceled,
191 void *res
192){
193 _Cor_Mutex_Lock(&fut->mutex);
194 if(fut->state == Future_State_Waiting){
195 fut->canceled = canceled;
196 fut->value = res;
197 _Future_Ready(fut);
198 } else {
199 _Cor_Mutex_Unlock(&fut->mutex);
200 }
201}
202
203
204bool Future_Await(
205 Future *fut,
206 void **res
207){
208 fut->vfptrs->await(fut);
209 if (res){
210 *res = fut->value;
211 }
212 return fut->canceled;
213}
214