1 contributor
232 lines4.7 KB
1#include "future.h"
2#include "coroutine.h"
3#include "task.h"
4#include <assert.h>
5#include <stdio.h>
6#include <stdlib.h>
7#include <time.h>
8#include <errno.h>
9#include "cor_thread_local.h"
10#include <math.h>
11#include <string.h>
12#include "timespec_utils.h"
13
14
15Future_vfptrs_t Future_vfptrs = {
16 &Future_dtor,
17 &_Future_Await,
18 &_Future_SetResult
19};
20
21typedef struct Future_WatcherSpec {
22 Future_Watcher watcher;
23 void *me;
24} Future_WatcherSpec;
25
26void Future_ctor(Future *fut){
27 fut->vfptrs = &Future_vfptrs;
28 int r = pthread_mutex_init(&fut->mutex, NULL);
29 assert(r == 0);
30 fut->state = Future_State_Waiting;
31 fut->value = NULL;
32 fut->canceled = false;
33 fut->watchers = NULL;
34 fut->nwatchers = 0;
35 fut->maxwatchers = 0;
36}
37
38
39Future *Future_New(){
40 Future *fut = malloc(sizeof(Future));
41 Future_ctor(fut);
42 return fut;
43}
44
45
46void Future_dtor(
47 Future *fut
48){
49 int r;
50
51 r = pthread_mutex_lock(&fut->mutex);
52 assert(r == 0);
53 assert(fut->nwatchers == 0);
54 free(fut->watchers);
55 r = pthread_mutex_unlock(&fut->mutex);
56 assert(r == 0);
57
58 r = pthread_mutex_destroy(&fut->mutex);
59 assert(r == 0);
60}
61
62
63void Future_Delete(
64 Future *fut
65){
66 fut->vfptrs->dtor(fut);
67 free(fut);
68}
69
70
71void Future_AddWatcher(
72 Future *fut,
73 Future_Watcher watcher,
74 void *watcher_me
75){
76 int r = pthread_mutex_lock(&fut->mutex);
77 assert(r == 0);
78 if (fut->state == Future_State_Done) {
79 r = pthread_mutex_unlock(&fut->mutex);
80 assert(r == 0);
81 watcher(watcher_me, fut);
82 } else {
83 if (fut->nwatchers >= fut->maxwatchers) {
84 fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
85 fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
86 assert(fut->watchers);
87 }
88 fut->watchers[fut->nwatchers].watcher = watcher;
89 fut->watchers[fut->nwatchers].me = watcher_me;
90 fut->nwatchers++;
91 r = pthread_mutex_unlock(&fut->mutex);
92 assert(r == 0);
93 }
94}
95
96
97void Future_RemoveWatcher(
98 Future *fut,
99 Future_Watcher watcher,
100 void *watcher_me
101){
102 int r = pthread_mutex_lock(&fut->mutex);
103 assert(r == 0);
104 for (int i = 0; i < fut->nwatchers; i++) {
105 if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
106 fut->nwatchers--;
107 if (i < fut->nwatchers) {
108 fut->watchers[i] = fut->watchers[fut->nwatchers];
109 }
110 break;
111 }
112 }
113 r = pthread_mutex_unlock(&fut->mutex);
114 assert(r == 0);
115}
116
117
118static void _Future_Ready(
119 Future *fut
120){
121 int r;
122 fut->state = Future_State_Done;
123
124 // Take note of watchers list, and reset it
125 Future_WatcherSpec *watchers = fut->watchers;
126 int nwatchers = fut->nwatchers;
127 fut->watchers = NULL;
128 fut->nwatchers = 0;
129 r = pthread_mutex_unlock(&fut->mutex);
130 assert(r == 0);
131
132 // notify those watchers
133 for (int i = 0; i < nwatchers; i++) {
134 watchers[i].watcher(watchers[i].me, fut);
135 }
136 free(watchers);
137}
138
139
140void Future_SetResult(
141 Future *fut,
142 bool canceled,
143 void *value
144){
145 fut->vfptrs->set_result(fut, canceled, value);
146}
147
148
149bool Future_GetResult(
150 Future *fut,
151 void **res
152){
153 assert(fut->state == Future_State_Done);
154 if (res){
155 *res = fut->value;
156 }
157 return fut->canceled;
158}
159
160
161typedef struct future_bits {
162 Future *fut;
163 Coroutine *cor;
164} future_bits;
165
166
167static void future_complete(
168 void *me,
169 Future *fut
170){
171 (void)fut;
172 Coroutine *cor = (Coroutine *)me;
173 Coroutine_Continue(cor, NULL, false);
174}
175
176
177static void on_yield_for_future(
178 void *me
179){
180 future_bits *bits = (future_bits *)me;
181 Future_AddWatcher(bits->fut, future_complete, bits->cor);
182}
183
184
185void _Future_Await(
186 Future *fut
187){
188 future_bits bits;
189 bits.fut = fut;
190 bits.cor = Coroutine_GetActive();
191 Task *my_task = current_task;
192 current_task = NULL;
193 my_task->awaiting_future = fut;
194 if (my_task->canceled){
195 Future_SetResult(fut, true, my_task->cancel_value);
196 }
197 Coroutine_Yield(NULL, on_yield_for_future, &bits);
198 my_task->awaiting_future = NULL;
199 current_task = my_task;
200}
201
202
203void _Future_SetResult(
204 Future *fut,
205 bool canceled,
206 void *res
207){
208 int r;
209 r = pthread_mutex_lock(&fut->mutex);
210 assert(r == 0);
211 if(fut->state == Future_State_Waiting){
212 fut->canceled = canceled;
213 fut->value = res;
214 _Future_Ready(fut);
215 } else {
216 r = pthread_mutex_unlock(&fut->mutex);
217 assert(r == 0);
218 }
219}
220
221
222bool Future_Await(
223 Future *fut,
224 void **res
225){
226 fut->vfptrs->await(fut);
227 if (res){
228 *res = fut->value;
229 }
230 return fut->canceled;
231}
232