Information Technology Reference
In-Depth Information
Table 2. Distributed algorithm for π-spaces/π-channels
Client API
39
check_restore (pipe_id):
1
pi_attach (n):
40
query π-Server for status of migrated reader
2
send 〈 get, n, my_id 〉 to π-Server
41
if migrated reader found:
3
recv 〈 get, &pipe_id 〉 from π-Server
42
fd ← connect to migrated reader
4
id ← free entry on local id_tab
43
update fd_list with new fd
5
if buffer for pipe_id does not exist:
44
remove reader from hold_list
6
create buffer for pipe_id
Algorithm of the π-server
7
associate pipe_id & buffer with id
8
return id
45
On 〈 get, &n, &src 〉:
46
entry ← find n on π-table
9
pi_create (n):
47
if entry does not exist:
10
send 〈 put, n, my_id 〉 to π-Server
48
entry ← create n on π-table
11
recv 〈 get, &pipe_id, &dest_list 〉 from π-Server
entry.reader ← src
12
foreach dest in dest_list:
50
entry.has_reader ← true
13
fd_list [dest] ← connect to dest
51
pipe_id ← entry.pipe_id
14
id ← free entry on local id_tab
52
send 〈 get, pipe_id 〉 to src
15
associate fd_list & pipe_id with id
53
if entry.is_cached:
16
return id
54
initiate forwarding to src
17
pi_read (id, m, len):
55
On 〈 put, &n, &src 〉:
18
off ← compute offset
56
entry ← find n on π-table
19
buffer ← retrieve buffer for id
57
if entry does not exist:
20
block until seg [ off, len ] in buffer || eoc
58
entry ← create n on π-table
21
if segment was found:
59
dest_list ← ()
22
get seg [ off, len ] from buffer
60
if entry.has_reader:
23
store segment into m
61
append entry.reader to dest_list
24
return len
62
append my_id to dest_list
25
return -1 /*end of channel */
63
pipe_id ← entry.pipe_id
64
entry.is_cached ← true
26
pi_write (id, m, len):
65
send 〈 put, pipe_id, dest_list 〉 to src
27
pipe_id ← map pipe_id from id
Algorithm of the in-bound thread
28
fd_list ← retrieve fd_list for id
29
off ← compute offset
66
On CON 〈 &src, &pipe_id 〉:
30
success_count ← 0
67
fd ← accept inbound connection
31
foreach dest in fd_list:
68
buffer ← retrieve buffer for pipe_id
32
write SEG 〈 pipe_id, m, off, len 〉 to dest
69
if buffer does not exist:
33
if write successful:
70
create buffer for pipe_id
34
success_count++
71
associate fd with pipe_id
35
if success_count < len(fd_list):
36
check_restore(pipe_id)
72
On SEG 〈 &pipe_id, &m, &off, &len 〉:
37
update status of pipe_id
73
buffer ← retrieve buffer for pipe_id
38
return len
74
store m at offset off
continued on following page
Search WWH ::




Custom Search