body name библиотека global Matrix
imports (достъп по име) … var m[N, N] := … end
decl., proc … resource f()
final code imports node, Matrix
end name var x: node.node; if x …
Matrix.m[3,4] :=: … end
Възможности за паралелизъм
- динамично генерирани процеси - дистанционен call
- семафори - rendezvous механизъм
- отлагане на съобщения - input (receive)
-
разнообразие на средства за връщане на управлението
= оп като интерфейс между процеси (абстрактен)
= ресурсът като основна структура за разпределена обработка
= комуникацията между процеси чрез: общи пром., общи ор
input механизъм call, send на op
a) b) c) d)
call call send send
proc in proc in
end ni end ni
дистанционен rendezvous динамично асинхронен
(обикновен) call механизъм създ. процес обмен съобщения
Процеси създавани в run-time
-
process name [(квалификатор …)] пр: resource mult()
тяло const N:=20; var a[N,N], b[N,N],
c[N,N] : real
end … process multiply( I:=1 to N, j:=1 to N)
var temp := 0.0
fa k:=1 to N temp +:=a[I,k]*b[k,j] af
c[I,j] := temp end
- недетерминираност, приключване. final … end end
II. Независима форма на процес: при известен брой
при изпълнение в и извън start-up код
op декл., proc, {send}
динамично генериране на паралелни процеси
procedure call пр: resource f()
process send var x:=0; op P1() {send}
proc call send P1() #fork към P1
send proc P1()
пр: resource compressor x+:=3 end ….
op compress(..) op P2() {send}
body compressor() send P2() #fork към P2
proc compress() proc P2()
…. end; end x+:=4;… end; end
var c:=cap compressor
c:= create compressor()
… многократно генериране
send c.compress(..) I на ресурса
send c.compress(..)… II ..N
-
Генериране на процеси с:
co <команда> || … oc
call извикване (на процедура)
sned (proc)
:= потребителска ф-ия
пр: co p(3) || q() || a:= z(x,y) oc
пр: cnt := 0
co(I:=1 to N st a[I] != 0) p(I) cnt++; write(cnt, I) oc
Асинхронно предаване на съобщения
нов процес (proc) {send} send in
op процедура {call}
опашка от съобщения send по-ниско ниво от
call семафори
receive име_op [квалификатор] (параметри)
пр: resource merge process merge
const EOS:= high(int) var v1, v2: int
op stream1(x:int), stream2(x:int) receive stream1(v1); receive stream2(v2)
process one do v1 < EOS or v2 < EOS
… send stream1(y) if v1 <= v2 write(v1);
send stream1(EOS); end receive(stream1(v1)
process two [ ] v2 < v1 write(v2);
… send stream2(y) receive stream2(v2); fi
… send stream2(EOS);end od; write(EOS); end; end
клиент – сървър
пр: 1 1 1 N
resource cs1() resource cs2()
op request(..), results(..) op request(id:int,..), results[N](..)
process client process client (I:=1 to N)
… …
send request(..) send request(I,…)
… end …
process server receive result[I](..)
do true … end
receive request(..) process server
… send results(..) do true receive request(id,..)
od; end; end … send results[id](..); od;end
1 x прибл.== от страна кл. x x
resource cs3() resource cs4()
optype results_type:= (..) optypr results_type :=(..)
op request(results_cap:cap results_type,..) const N:= 20
const N:=20 op request(results_cap:cap results_type,..)
process client(I:=1 to N) process client(I:=1 to N)
op results: results_type op results: results_type
… send request(results,..) може … send request(results,..)
… receive results(..); …; end различни … receive(results(..) … end
process server resources proc request(results_cap,..)
do true var results_cap: (request в … send results_cap(..)
cap results_type spec на server) end; end
reseive request(results_cap,..)
… send results_cap(..); od;end
Аналогия – семафори асинхронен обмен на съобщения
P(s); V(s) send; receive
sem s op s(){send} 1. без параметри
P(s) receive s() 2. {send}
V(s) sned s( )
sem s:= израз op s(){send}; var e:=израз
fa I:=1 to e send s() af
пр: resource cs1() process p(I:=1 to N)
const N:= 20 опашка # некритична секция
op mutex() {send} от съобщения … # критична секция
send mutex() #инициализира receive mutex()
var x:=20 x:=x+1; send mutex()
# некритична секция
Семафори, съдържащи данни
семафор синхрон. + данни (параметри) безкраен буфер за съобщ.
пр: resource main() fa I:=1 to B send pool(I) af #подава В
op pool(index: int) #семафор + индекс #съобщ. в опашка, т.е
const B := 20 #буфери #буфер “I” е свободен
const N:=20 # процеси process p(I:= 1 to N)
var buffer[1 : B] :T #всеки буфер … receive pool(x) #заявка за буфер
#може да е и масив # + изчакване за достъп до буф.”x”
… send pool(x) #освобождава буф. x
не е нужен list на свободните буфери … end; end
Дистанционно обръщение към proc.
- синхронизация call proc
- нов процес (динамично) в опделен ресурс
I. Директно извикване чрез call end
пр: resource Stack
type results := enum(OK,OVERFLOW,.)
op push(item: int) returns r: result resource Stack_User()
op pop(res item: int) returns r: result import Stack
body Stack(Size : int) var x :Stack.result
var store[1:size]: int, top: int :=0 var S1,S2: cap Stack
proc push(item) returns r var y: int
if top < sizestore[++top]:=item S1:= create Stack(20)
r:=OK [ ] top = size r:= OVERFLOW S2:= create Stack(10)
fi; end … S1.push(4); S2.push(..)
proc pop(item) returns r .. if(x:=S1.pop(y) != OK…
…. end; end Stack …. end
проблем с охрана на достъп от конфликти (при > 1 напр. push)- семафори
-
Създаване на процеси чрез send/receive (предимства и недостатъци)
op p(val x:int, var y:int,res z:int) op p(x,y:int,r(y,z:int)
process q process q
… call p(a,b,c)… end … send p(a,b)
proc p(x,y,z) … receive r(b,c) … end
z:=…; … proc p(x,y); var z: int опашка
end z:=…; send r(y,z); end
-
при много процеси: с call няма промяна (проблем с конфликти)
със send: op p(x,y:int;rcap: cap(y,z:int)) receive r(b,c)
process q(I:=1 to …) end
… proc p(x,y,rcap)
op r(y,z: int) # отделни опашки …. z:= …
send p(a,b,r) send rcap(y,z) ; end
Завършване на proc
- return send - reply call
call send
1 пр: resource f() пр: resource conversation()
proc op fo(x: int) returns y:int op server(n:int) returns
2 process p c:cap(s:string[10])
3 … z:=fo(10);… process client(I:=1 to…)
3 end var t[20]: string[10]
proc fo(x) returns y …var c:cap(s:string[10])
y:= x* 8 c:= server(20)
reply 1.създава
y:=0; end; end 2.предава брой редове
3.получава указател
към опашка
client server fa I:=1 to 20 send c(t[I]) af
end
създава proc server(n) returns c
reply op line(s: string[10]); c:= line;reply
fa I := 1 to n var s: string[10]
receive line(s); …
af ; end; end
-
forward op(параметри,..) p() f() g()
= с еднаква сигнатура
= връщана стойност от последния
= повторен forward- както send
пр: op f(x:int) returns z:int proc f(x) returns z
op g(y:int) returns z:int forward g(2 * x)
process p # променя се “x”
var a:= f(1); write(a) end
end proc g(y) returns z
пр: client диспечер сървъри z :=y+10; end
Rendezvous механизъм (разширен)
-
няма нов процес;
-
синхронност;
-
call op… input, receive in{op(параметри)[returns име]
-
блокировка на процес; [st израз_за_синхрон.][by израз_за_
-
използване на парам. при избора диспечер.] блок [ ]…} ni
-
приключва при изпълнение на 1 блок
-
[ ] else блок липсва блокировка както select/ accept в Ada
(но по-богат)
пр: resource main() process q
op f(x:int),g(n:real) returns v:real var z: int; …
process p1 in f(x)z+:=x[ ] g(n) returns
var y:int;… call f(y)… end v:= n*n-2.2 ; ni ;… end; end
process p2 (само 1 ако няма цикъл;
var w:real;… w:=g(3.8) … end ## недетерминиран ред##)
receive in
receive f(v1,v2) приблизително еквив. in f(p1,p2)v1:=p1; v2:=p2 ni
пр: process A process B call
… call foo(v) … receive foo(x) send
… end … end както rendezvous в CSP
пр: call call send
receive request(); receive release()
- различия receive in 1. >1 op
2. резултат (двупосочност)
3. синхронизация и диспечеризация
-
израз за синхронизация: in a(x) st x=3…[ ] b(y,z) st y>f(z) ni
пр: resource buffer in deposit(item) st count
op deposit(item: int) buf[rear]:= item…
op fetch() returns item: int [ ] fetch() returns item
body buffer(size:int) изчаква st count>0 item:=buf[front]…ni
var buf[0:size-1]:int od; end
var count:=0,front:=0,rear:=0
process worker; do true
проблемът: читатели/писатели:
process RW_allocator
var nr:=0, nw:=0 call
do true in start_read() st nw=0nr++ call(send)
[ ] end_read()nr—[ ] start_write() st nr=0 and nw=0nw++ предимство
[ ] end_write() nw--; ni; od; end на читатели (ако са много,
може винаги nr!=0)
- израз за диспечеризация пр: in a(x) by x .. ni
= проверява: стойност и най-старо.
= трудна реализация
пр: диспечер заявки: do truein request(usage_time) by usage_time…
Обядващите философи
I централизирано решение P5 P1
resource main() S
import Philosopher, Servant P4 P2
var n,t:int
# input No philosophers(n), sessions(t) P3
var s: cap Servant; s:=create Servant(n)
fa I:=1 to ncreate philosopher(s,I,t) af; end
resource Servant resource Philosopher
op getforks(id:int) import Servant
op relforks(id:int) body Philosopher(s:cap Servant; id,
body Servant(n:int) t: int)
process Server process Phil
var eating[1:n]:=([n] false) fa I:=1 to ts.getforks(id)
do true write(“Philosopher”,id,”is eating)
in getforks(id) st not eating[(id mod n)+1] s.relforks(id)
and not eating[((id-2)mod n)+1] write(“Phil.”,id,”is thinking”)
eating[id]:=true [ ] relforks(id) deadlock free: винаги 2 вилици
eating[id]:=false; ni; od; end; end проверка двамата съседи
-
възможни конспирации
-
възможни задръствания в общия сървър
II Разпределен алгоритъм P1 II заявки за
resource main() I S2 P2 вилици
import Philosopher, Servant I S1 S3
var n,t:int P5 P3
# input No philosophers(n),sessions(t) II S5 S4
var s[1:n]:cap Servant P4
fa I:=1 to ns[I]:=create Servant(I) af
fa I:=1 to n-1create Philosopher(s[I],s[I+1],I,t) af # за избягване deadlock-
create Philosopher(s[1],s[n],n,t) #асиметрично свързване, т.е. разкъсва се
end #кръговата заявка (ляв ---- десен)
resource Philosopher resource Servant
import Servant left right op getfork(), relfork()
body Philosopher(l,r: cap Servant;id,t: int) body Servant( id: int)
process Phil process Server
fa I:=1 to tl.getfork(); r.getfork() do true
write(“Phil.”, id,”is eating) receive getfork();receive relfork()
l.relfork(); r.relfork() od; end; end вилицата се дава
write(“Phil.”,id,”is thinking”) само на един по реда на
af; end; end заявкитеизкл. глад
III. Децентрализирано решение P2
- всеки S взаимодейства със съседите S2 1 P3
- всяка вилица е в един от двата съседни Si P1 S1 S3
- яде се когато Si има 2 вилици 1
- в началото: = всеки Si има указатели 2 S5 S4
към съседите (чрез op, a не чрез иниц.) 0 1
= асиметрично инициализиране на вилиците P5 P4
- resource Philosopher не е нужно да се идентиф. при
getforks(), relforks() (идентично с I) вилици
-
инициализацията с вилици в main()
op forks(haveL, dirtyL,haveR,dirtyR : bool) различни за S1,Sn, S2..S(n-1)
-
Servent обслужва заяки на getforks(), relforks()
конспирацията се избягва – всеки Si освобождава по 2 използвани вилици.
-
Servent комуникира със съседите:
op needL() {send} #съсед иска
op needR() {send}
op passL() {send} #получил # информира {send} съседните
op passR() {send} # Servers какво притежава и иска
- resource Servant #Връща всички “dirty”
… # Ако всичко е наред:
proc getforks() от Phil. send eat()
send hungry() dirtyL:= true; dirtyR:= true
receive eat() от Phil. receive relforks()
end #раздава вилици на желаещите
process Server # in needR()… send r.passL()
receive links(l,r) #иниц. … ni добро решение при
receive forks(..) od; end размножаване на
do true in hungry() # заявява вилици files(database consist.)
Occam и транспютера
CSP процеси, комуникация: point to point
небуферирана
- синхронен обмен канал, link
- независимост от тактова честота вътрешни външни
- паралелен обмен (програмни) (физически)
-
база за протоколи
Транспютри на INMOS (T414) (T800)
УБ 32 битов микропр. FPU 32
Timer управление на УБ 32 битов пр.
2К RAM links 4K RAM управление
link_1 Timer на links
interfaces … link_1
32 bus link_4 interfaces …
10 MIPS 32 bit bus link_4
1.5. MFLOPS
канали:
linkYout linkYin
linkXin linkXout
Транспютерни структури
1 6 6
М 1 5 2 2 2 n
T
mux mux T
buffer Process buffer
диспечер
master
Process T
заявки резулт. данни
Process БД управление
Process
БД
….
Програмен модел
31 0 инструкции load const, add const
A - общ формат load local, store local
B - малък брой load local pointer
C стек load nonlocal, store …
WS пр: add коп данни jump, call
PC 7 0 prefix, negative prefix
OP коп (произволно дълъг оп.)
operate
OP
-
независимост от дължината на думата prefix
-
instruction prefetch pointer
пр: occam израз
x := (v+w) – (y + #24)
инструкции на ниско ниво: load nonlocal y (от А)
load local v prefix 2
load local w add constant 4 (в А)
operate add operate subtract
load local y_pointer (в А) store local x 9 bytes
Паралелизми
- микро кодируем диспечер пр: PAR
- активни процеси изпълняващи P
чакащи Q
- неактивни процеси: input R
output S
временно бл. front WS памет
back P
Q
R
WS S
PC
“start process”, “end process”
Комуникации
-
канали - синхронизация
-
“input message”, ouput message”
вътрешни канали:
А P се изпълнява WS на P
A брояч канал канал
B канал empty p
C *канал
B външни канали (link диспечеризира)
P link x link y Q
брой брой
канал канал
* *
WS на P link x link y WS за Q
P Q
*message *message
брой брой
Алтернативи:
“enable channel” “enable timer”
“disable channel” “disable timer”
“alternative wait”
Протокол на обмен:
1 1 данни 8 0 преразпределение в списъка на процеси
1 0 потвържде ние става само след предаване фиксирания
в “брой” брой байтове.
Паралелно програмиране – теоретични модели Паралелни изчисления банкова аналогия
синхронни асинхронни Sum(ti) за I(1:N)
1. Векторни 4.MIMD
2. SIMD 5. редукционен модел
3. систолични П1 П2 … Пn
-
тривиален паралелизъм t=Max(t1.. tn)
-
балансиран паралелизъм
-
степен на раздробяване синхр.
-
необходимост от координация ас.
1. П1 етаж1 2. SIMD != SPMD (асинронност)
П2
…
Пi етаж_n П1 П2 …. Пn
Sum(ti / n) за I(1:n)
opt: n =m (fine grained par) - първо разпределение
1980г. Carnegie Mellon Univ. - след това парал. изчисл.
-
недостатъци: 1. балансиране
2. точност на съотв.
3. Ti=2(C) + Tmax
3. П1 … П1j ет._1 4.
… ет._2 Пn 1.взаимно изкл.
Пi1 … Пij ет._n добър за коорд.2.shared mem. r
large grain 3.размн. данни w
- t i/o min синхронизация: - бариерна
- проблем с натоварване и общи обр. - dataflow
- добър за масово обслужване - забавяне
5. demand driven dataflow (граф-редукционен)
пр: синхронен внос/износ с изчакване на оп. до поискване.
пр: X= (-B +/- SQRT(B*B – 4AC)) /2 *A
9/5 /
B
C
Измерване на производителност
1. Закон на Grosh в менижмънта памет
2. Фон-Нойманова теснина data bus
3. Закон на Amdhal (IBM) instr.
S = T(1) / T(n) УБ
T(n) = T(1)б + (T(1)(1-б))N ИБ
S = 1 / (б+(1-б)/N) = N / (бN+(1-б)) 4. Закон на Gustafson-Barsis (1988)
пр: б = 0.67 (1-б) N
S =1/(0.67+(0.33/10)=1.42 T(1)= б+(1-б)N; T(N)=б+(1-б)=1;
б=0.004;N=1000; S ==250… S = N-(N-1)б
пр: N=10; б= 0.67; S=10-9*0.67 = 3.97
б = 0.004; S== 1000.!!!
6. Графични оценки на паралелизма
А. Граф: S B. Диаграма на Gantt (предназначение)
1 процесор S процесори
W1… Wn W1 t S
Stop … W1 W2 … Wn
Wn stop
stop S1=T(1)/T(5)=7/3=2.33
7. Гранични стойности пр: S2=N/(бN+(1-б))=
N/log2N <= S <= N =5/(0.28*5+0.72)=2.33 A
- различно натоварване; S3=N-(N-1)б= 3.86 G-B
- t комуникация (S==1/C(N)); всъщност: б=(1+4*0.33)/5= 0.467
- startup code; S2= 1.7 A; S3 = 3.13 G-B
8. Дефиниране на S (степен на ускоряване)
- степен на натоварване на проц. тривиален пар.
- T1/ T(N) T1 (серийна) S S=N <=45’
T(1) (на 1 процесор) S==1/C(N)
S > N възможно само при Т1 вместоТ(1) S==N/log2N
-
(разделяй/владей)
1 2 10 N
пр: 1 1
1 0 0 1 1
2 0 3 2 3
1 0 1 0 0 1 1 0 1 1
0 0
4 5 6 7 4 5 6 7
1 1 1 1 1 1 1 1
0 0 8 0 0 1 1 8 1 0
-
1
1 2 3 4 5 1 2 3 4 5
-
1
2 3 2 #
6 7 4 5 4 3 #
8 # 6 5 #
S=8 / 4 =2 # # 7 S = 8/6 = 1.33
== S = N/ log2N = 2.33 8
- проблем на минимакса при паралелизмите: комуник. парал.
9. Други величини, свързани с производителността:
t t % използваемост 100 принос време
-
отговор 1
n
1 2 проц. 10х10.. 1 2 3 N N заявки
Ui=Ci/(Ci+Ii) Пi=S(I)/I
производителност?
бързина на отговор решаване на ниска цена
обемни задачи
Разпаралеляване на серийни програми
- цикли 1. data dependence
- разклонения 2. реструкториране
3. разпределяне в парал. среда.
А. Формализация на data dependence (d.d)
a. S б T ако има S’ и Т’: - обръщат се към М; - S бf T (S’ пише,Т’ чете)
- S’ е преди T’; - S ба T (S’ чете,Т’ пише)
-
М се обраб. само от S’ и Т’ - S бо Т (S’ пише,Т’пише)
индиректна зависимост: S0 б S1 б.. Sn б Т
б. задължителен порядък (execution order): Si O Sj; Si(i1,..in) O Sj(j1,..jk);
в. набор IN(Si) , OUT(Si);
пр: L1: DO I=1,5 - S1( i’,i’) O S2( i’’,i’’)
L2: DO J=1,4 ако i’
S1: A(I,J) = B(I,J) + C(I,J) (S1(1,1) O S2(1,1))
S2: B(I,J+1)=A(I,J)+B(I,J) - S2(i’’,j’’) O S1(i’,j’)
ENDDO; ENDDO ако i’’
(S2(2,3) O S1(3,1))
г. почти пълен анализ: ако S1 бf S2 то S1OS2 и OUT(S1) ^ IN(S2)!=0
ако S1 ба S2 то S1 O S2 и IN(S1) ^ OUT(S2) !=0
ако S1 бо S2 то S1 O S2 и OUT(S1) ^ OUT(S2) !=0
Б. Представяне на data dependency S1 S2
1.насочен граф: V = {S1,…Sn}
G(V,E) където: E = {gi,j = (Si, Sj)} S3
gi,j – дъги на зависимости
Завършена програма в OCCAM: паралелна сортировка
1. Алгоритъм: процес 1 процес 2
accept числа highest next highest next
5 2 8 4 7 7 4 4 4 7 4
7 8 7 7 2 2
8 2 2 7 5 5
8 5 5 7 8 7
8 8 8 8
или: 5 2 8 4 7 P0 P1 P2 P3 P4 87542
C0 C1 C2 C3 C4 C5
следователно: [100]INT
[101] CHAN OF INT pipe:
за ‘I’ процес: SEQ SEQ
pipe[I] ? next pipe[I+1] ! highest
IF highest := next
next <= highest
pipe[I+1] ! next
next > highest
за всички числа в “I” стъпало: всички стъпала в паралел:
INT highest: PAR I=0 FOR 100
SEQ input IS pipe[I]
pipe[I] ? highest output IS pipe[I+1]
SEQ j=0 FOR 99 -- може и while INT highest:
INT next: SEQ
SEQ input ? highest
pipe[I] ? next SEQ j=0 FOR 99
IF INT next:
next <= highest SEQ
pipe[I+1] ! next input ? next
next >= highest IF
SEQ next<=highest
pipe[I+1] ! highest output ! next
highest := next next> highest
pipe[I+1] ! highest SEQ
output!highest
highest:= next
обща програма: output ! highest
VAL numbers IS 100:
[numbers+1] CHAN OF INT pipe:
PAR
PAR I=0 FOR numbers output ! highest
input IS pipe[I]: SEQ I=0 FOR numbers
output IS pipe[I+1]: INT unsortnumber:
INT highest: SEQ
SEQ input ? unsortnumber
input ? highest pipe[0] ? unsortnumber
SEQ j=0 FOR numbers-1 SEQ I=0 FOR numbers
INT next: INT sortnumber:
SEQ SEQ
input ? next pipe[numbers] ? sortnumbers
IF output ! sortnumber
next <= highest
output ! next
next > highest
SEQ numbers + 2 процеса
output ! highest
highest := next
оптимизация:
PAR I = 0 FOR numbers
input IS pipe[I]: VAL alreary.sorted IS I:
ouput IS pipe[I+1]: SEQ j=0 FOR already.sorted
SEQ INT pass.on:
VAL still.unsorted IS numbers-1: SEQ
INT highest: input ? pass.on
SEQ output ! pass.on
input ? highest
SEQ j=0 FOR still.unsorted – 1
INT next:
SEQ
input ? next
IF
next <= highest
output ! next
next > highest
SEQ
output ! highest
highest := next
output ! highest
-
проблем с приключване:
= спира се подаването блокира процес;
= общ сигнал за край необходима е глобална var
= подава се заключителна информация по канала.
Комуникация между тредове
А. С помощта на глобални променливи
деклариране: volatile int threadContoller;
иницииране: 1. във функция OnStartThread() – threadController = 1;
2. във функция OnStopThread() – threadController = 0;
проверка във ф-ия TreadProc() while(threadController ==1)
// тредът е активен
Б. С помощта на дефинирани user_message
осигурява двупосочност на комуникациите тред ----главна програма.
-
декларираме в .h файл: const WM_USERMSG = WM_USER + 100;
-
в ThreadView.h: afx_msg LONG OnThreadended(WPARAM,LPARAM);
-
свързваме със съобщението – вThreadView.cpp към картата на съобщенията AFX_MSG_MAP:
ON_MESSAGE( WM_TREADENDED, OnTreadEnded)
-
изпращане на съобщ. от тред към гл. програма ( в ThreadProc()):
::PostMessage((HWND)param, WM_USERMSG,0,0);
-
добавяме описание на нова ф-ия в гл. програма:
LONG CThreadView::OnTreadended( wParam, lParam) {…}
В. С помощта на събития (events object от MFC класа CEvent):
тези обекти са в състояние signaled (сигнализирани) или nonsignaled.
-
създаваме такъв обект: CEvent threadStart; //обекта е несигнализиран
-
threadStart.SetEvent(); // обекта е сигнализиран т.е свързаният с него тред може да започне.
-
във ф-ия ThreadProc() дори при спрял тред
{ .. ::WaitForSingleObject(threadStart.m_hObject, INFINITE); …}
-
периодично да пита за ново състояние на обекта (например спрян от гл. програма). Напр:
int result = ::WaitForSingleObject(threadEnd.m_hObject, 0);
Сподели с приятели: |