Semin ´aˇr Java
ParalelismusRadek Ko ˇc´ı
Fakulta informaˇcn´ıch technologi´ı VUT
Obsah
Vl ´akna
Multithreading Sd´ılen´ı prostˇredk ˚u Synchronizace
Pojmy
Proces
spu ˇst ˇen ´y program s vlastn´ım adresov´ym prostorem Vl ´akno (podproces)
nez ´avisl ´a vedlejˇs´ı ´uloha spu ˇst ˇen ´a v kontextu procesu sd´ıl´ı adresov´y prostor procesu Multithreading
technika rozd ˇel ˇen´ı b ˇehu programu na v´ıce podproces ˚u typicky pro odd ˇelen´ı ˇc ´ast´ı programu, kter ´e jsou v ´azan ´e na prostˇredky
Proces a vl ´akna
Process
Vl ´akna
vl ´akna jsou reprezentov ´ana objekty
ka ˇzd ´e vl ´akno m ´a sv ˚uj jedine ˇcn ´y identifik ´ator (pˇrid ˇelen pˇri vytvoˇren´ı)
Pr ´ace s vl ´akny
Bal´ık java.lang
Thread Runnable
Reprezentace vl ´akna
instance tˇr´ıdyThread(ev. odvozen ´ych tˇr´ıd)
Vytvoˇren´ı objektu, kter´y reprezentuje b ˇeh vl ´akna rozˇs´ıˇren´ım (d ˇediˇcnost) tˇr´ıdyThread
Tˇr´ıda Thread
Metody
start()
inicializace vl ´akna vol ´a metodurun()
odvozen ´a tˇr´ıda nepˇrekr´yv ´a tuto metodu! run()
k ´od vl ´akna
odvozen ´a tˇr´ıda pˇrekr´yv ´a tuto metodu sleep(long millis)
usp ´an´ı vl ´akna na dan´y poˇcet milisekund v´yjimka InterruptedException
Uk ´azka SimpleThread
public class SimpleThread extends Thread { public SimpleThread(String str) {
super(str); }
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(i + " " + getName()); try { sleep((long)(Math.random() * 1000)); } catch (InterruptedException e) {} } System.out.println("DONE! " + getName()); } }
Uk ´azka SimpleThread
public class TwoThreadsDemo {
public static void main (String[] args) { new SimpleThread("Jamaica").start(); new SimpleThread("Fiji").start(); }
Rozhran´ı Runnable
Metody
run()
k ´od vl ´akna
implementuj´ıc´ı tˇr´ıda mus´ı implementovat tuto metodu
Implementuj´ıc´ı tˇr´ıda nen´ı vl ´akno
informace, ˇze instance tˇr´ıdy definuje chov ´an´ı vl ´akna
Uk ´azka Clock
public class Clock extends Applet
implements Runnable { private Thread clockThread = null; public void start() {
if (clockThread == null) {
clockThread = new Thread(this, "Clock"); clockThread.start();
} }
public void paint(Graphics g) { ... } //get the time and convert it to a date ...
g.drawString(dateFormatter.format(date), 5, 10); }
Uk ´azka Clock
public void run() {
Thread myThread = Thread.currentThread(); while (clockThread == myThread) {
repaint();
try { Thread.sleep(1000); } catch (InterruptedException e) {
//the VM doesn’t want us to sleep anymore, //so get back to work
} } }
//overrides Applet’s stop method, not Thread’s public void stop() {
clockThread = null; }
Tˇr´ıda Thread nebo rozhran´ı Runnable
hodn ˇe tˇr´ıd potˇrebuje d ˇedit (rozˇsiˇrovat) jinou tˇr´ıdu
public class Clock extends Applet implements Runnable { ...
}
Java neumo ˇz ˇnuje v´ıcen ´asobnou d ˇediˇcnost
⇒rozhran´ıRunnable
clockThread = new Thread(this, "Clock"); clockThread.start();
ˇ
Zivotn´ı cyklus vl ´akna
New Thread
Dead
Not Runnable
Runnable start
The run method terminates runnable
ˇ
Zivotn´ı cyklus vl ´akna
Vytvoˇren´ı
new Thread(this, "Clock")
Spu ˇst ˇen´ı
start()
k ´od vl ´akna v metod ˇerun()(ThreadneboRunnable)
Ukon ˇcen´ı
pˇrirozen ´ym dob ˇehnut´ım metodyrun()
existuj´ı i jin ´e metody, ty se ale nedoporu ˇcuj´ı (deprecated) stop()
destroy()
ˇ
Zivotn´ı cyklus vl ´akna
Test stavu vl ´akna
isAlive()
⇒truepokud bylo vl ´akno spu ˇst ˇeno a nebylo ukon ˇceno
(nen´ı dead)
⇒falsepokud vl ´akno nebylo spu ˇst ˇeno, nebo bylo ukon ˇceno (je dead)
ˇ
Zivotn´ı cyklus vl ´akna
Pozastaven´ı (stav not runnable)
sleep(...) wait()
pˇri i/o operaci Uvoln ˇen´ı (stav runnable)
uplynut´ı doby ˇcek ´an´ı (vizsleep(...))
notify(), notifyAll()
Vl ´akna
Chov ´an´ı vl ´aken z ´avis´ı na jejich intern´ı reprezentaci. Native threads
vl ´akna opera ˇcn´ıho syst ´emu v´ıce procesor ˚u
preemtivn´ı pl ´anov ´an´ı Green threads
vl ´akna na ´urovni JVM jeden procesor
vl ´akno se mus´ı pˇrepnout samo jiˇz nejsou moc b ˇe ˇzn ´a
Pl ´anov ´an´ı vl ´aken
Jedna CPU
prov ´ad ˇen´ı vl ´aken se mus´ı pl ´anovat
pl ´anova ˇc v Jav ˇe je fixed-priority scheduling
pl ´anuje vl ´akna na z ´aklad ˇe jejich priority relativn ˇe k ostatn´ım vl ´akn ˚um
Priorita
vl ´akno d ˇed´ı prioritu vl ´akna, ve kter ´em bylo vytvoˇreno
ˇcten´ı/zm ˇena priority: getPriority(),setPriority()
rozsah:Thread.MIN PRIORITY–
Pl ´anov ´an´ı vl ´aken
Pl ´anova ˇc
vybere vl ´akno (runnable) s nejvyˇsˇs´ı prioritou pokud je jich v´ıce se stejnou prioritou, vybere n ´ahodn ˇe/spravedliv ˇe
Vl ´akno b ˇe ˇz´ı dokud se nestane:
na syst ´emu s time-slicing ub ˇehne pˇrid ˇelen ´e ˇcasov ´e kvantum
jin ´e vl ´akno s vyˇsˇs´ı prioritou pˇrejde do stavu runnable
skon ˇc´ı metodarun()
vl ´akno se vzd ´a procesoru
vl ´akno se dobrovoln ˇe vzd ´a procesoru – zpr ´avayield()
Thread
Thread Thread.currentThread()
vr ´at´ı pr ´av ˇe b ˇe ˇz´ıc´ı vl ´akno (objekt)
setName / getName
ka ˇzd ´e vl ´akno m ˚u ˇze m´ıt sv ´e jm ´eno
long getId()
ka ˇzd ´e vl ´akno m ´a sv ˚uj jedine ˇcn ´y identifik ´ator
Thread.State getState()
vrac´ı stav vl ´akna
Thread
enum Thread.State
NEW – vl ´akno pouze vytvoˇreno RUNNABLE – b ˇe ˇz´ıc´ı vl ´akno
BLOCKED – vl ´akno ˇcek ´a na monitoru
WAITING – vl ´akno ˇcek ´a na jin ´e vl ´akno / operaci (ˇcasov ˇe neomezeno)
TIMED WAITING – vl ´akno ˇcek ´a na jin ´e vl ´akno (ˇcasov ˇe omezeno)
Thread – stavy vl ´akna
WAITING Object.wait() Thread.join() LockSupport.park() I/O operace TIMED WAITING Object.wait(long) Thread.join(long) Thread.sleep(long) LockSupport.parkNanos(long) LockSupport.parkUntil(long) java.util.concurrent.locks.LockSupportSkupina vl ´aken
java.lang.ThreadGroup
vl ´akno patˇr´ı vˇzdy k n ˇejak ´e skupin ˇe existuje implicitn´ı syst ´emov ´a skupina skupiny tvoˇr´ı stromovou hierarchii pˇr´ıslu ˇsnost ke skupin ˇe je nem ˇenn ´a
vl ´akno implicitn ˇe ”d ˇed´ı” skupinu vytv ´aˇrej´ıc´ıho vl ´akna
Synchronizace vl ´aken
Probl ´em producent–konzument
jedno vl ´akno (producent) zapisuje na sd´ılen ´e m´ısto data druh ´e vl ´akno (konzument) tato data ˇcte
operace z ´apis/ˇcten´ı se mus´ı stˇr´ıdat! Uk ´azkov´y pˇr´ıklad
tˇr´ıdaProducer– producent
tˇr´ıdaConsumer– konzument
Producent–konzument: Producent
public class Producer extends Thread { private CubbyHole cubbyhole;
public Producer(CubbyHole c) { cubbyhole = c;
}
public void run() {
for (int i = 0; i < 10; i++) { cubbyhole.put(i); try { sleep((int)(Math.random() * 100)); } catch (InterruptedException e) { } } }
Producent–konzument: Konzument
public class Consumer extends Thread { private CubbyHole cubbyhole;
public Consumer(CubbyHole c) { cubbyhole = c;
}
public void run() { int value = 0;
for (int i = 0; i < 10; i++) { value = cubbyhole.get(); }
} }
Producent–konzument: CubbyHole
public class CubbyHole { private int contents; public int get() {
return contents; }
public int put(int value) { contents = value;
} }
Producent–konzument: moˇzn ´e probl ´emy
Producent je rychlejˇs´ı
konzument m ˚u ˇze ”prop ´asnout” ˇc´ısla Konzumer je rychlejˇs´ı
konzument ˇcte stejn ´e ˇc´ıslo v´ıcekr ´at
⇒race condition
dv ˇe (pˇr´ıp. v´ıce) vl ´akna ˇctou/zapisuj´ı sd´ılen ´a data; v´ysledek zavis´ı na ˇcasov ´an´ı (jak jsou vl ´akna pl ´anov ´ana)
Producent–konzument: moˇzn ´e probl ´emy
⇒race condition
dv ˇe (pˇr´ıp. v´ıce) vl ´akna ˇctou/zapisuj´ı sd´ılen ´a data; v´ysledek zavis´ı na ˇcasov ´an´ı (jak jsou vl ´akna pl ´anov ´ana)
Nutn ´a synchronizace:
vl ´akna nesm´ı sou ˇcasn ˇe pˇristoupit ke sd´ılen ´emu objektu producent mus´ı indikovat, ˇze hodnota je pˇripravena; nezapisuje dokud si hodnotu nepˇre ˇcte konzument konzument mus´ı indikovat, ˇze pˇre ˇcetl hodnotu; ne ˇcte dokud producent nezap´ıˇse novou hodnotu
Monitor
Monitor
kritick ´e sekce
uzamˇcen´ı objektu pˇri pˇr´ıstupu ke kritick ´e sekci
pokud je objekt uzamˇcen, nikdo jin ´y nem ˚u ˇze pˇristupovat je kritick´ym sekc´ım objektu
odemknut´ı objektu pˇri v´ystupu z kritick ´e sekce Monitor v Jav ˇe (intrinsic lock)
sou ˇc ´ast´ı ka ˇzd ´eho objektu (tˇr´ıdaObject)
kl´ıˇcov ´e slovosynchronized
Kritick ´a sekce v Jav ˇe metoda
Producent–konzument: CubbyHole
public class CubbyHole { private int contents;
private boolean available = false; public synchronized int get() {
//CubbyHole locked by the Producer ...
// CubbyHole unlocked by the Producer }
public synchronized int put(int value) { // CubbyHole locked by the Consumer ...
// CubbyHole unlocked by the Consumer }
Z ´amek: reentrantn´ı
public class Reentrant {
public synchronized void a() { b();
System.out.println("here I am, in a()"); }
public synchronized void b() {
System.out.println("here I am, in b()"); }
Producent–konzument: CubbyHole
public class CubbyHole {private int contents;
private boolean available = false; public synchronized int get() {
if (available == true) { available = false; return contents; }
}
public synchronized int put(int value) { if (available == false) {
available = true; contents = value; }
Producent–konzument: synchronizace
Nutn ´a synchronizace:
vl ´akna nesm´ı sou ˇcasn ˇe pˇristoupit ke sd´ılen ´emu objektu producent mus´ı indikovat, ˇze hodnota je pˇripravena konzument mus´ı indikovat, ˇze pˇre ˇcetl hodnotu
producent nezapisuje dokud si hodnotu nepˇre ˇcte konzument
konzument ne ˇcte dokud producent nezap´ıˇse novou hodnotu
Producent–konzument: synchronizace
Nutn ´a synchronizace:
vl ´akna nesm´ı sou ˇcasn ˇe pˇristoupit ke sd´ılen ´emu objektu producent mus´ı indikovat, ˇze hodnota je pˇripravena konzument mus´ı indikovat, ˇze pˇre ˇcetl hodnotu
producent nezapisuje dokud si hodnotu nepˇre ˇcte konzument
konzument ne ˇcte dokud producent nezap´ıˇse novou hodnotu
Synchronizace vl ´aken (tˇr´ıda Object)
wait()
aktu ´aln´ı vl ´akno bude ˇcekat, dokud se nezavol ´anotify()
(notifyAll()) nad objektem
wait(long timeout)
. . . nebo neuplyne timeout
notify()
vzbud´ı jedno vl ´akno ˇcekaj´ıc´ı na monitoru objektu
notifyAll()
Synchronizace vl ´aken (tˇr´ıda Object)
wait(), ...
pˇred suspendov ´an´ım vl ´akna se odemkne monitor pokud vl ´akno vlastn´ı v´ıce monitor ˚u, odemkne se pouze monitor dan ´eho objektu
notify(), ...
ˇr´ızen´ı nen´ı okamˇzit ˇe pˇred ´ano vzbuzen ´emu vl ´aknu
Tyto metody m ˚u ˇze volat pouze to vl ´akno, kter ´e je vlastn´ıkem monitoru
vstoupen´ı do kritick ´e sekce (synchronized) – metoda,
Producent–konzument: CubbyHole
public synchronized int get() { while (available == false) {
try {
// wait for Producer to put value wait();
} catch (InterruptedException e) { } }
available = false;
// notify Producer that value has been // retrieved
notifyAll(); return contents; }
Producent–konzument: CubbyHole
public synchronized void put(int value) { while (available == true) {
try {
// wait for Consumer to get value wait();
} catch (InterruptedException e) { } }
contents = value; available = true;
// notify Consumer that value has been set notifyAll();
Synchronizace vl ´aken – efektivita
Pˇri uzamˇcen´ı objektu se zv´yˇs´ı n ´aklady na re ˇzii uv ´a ˇzit zm ˇenu n ´avrhu
pou ˇz´ıt z ´amek (synchronized) na konkr ´etn´ı objekt neodb ´yt soub ˇe ˇzn ´y pˇr´ıstup synchronizac´ı vˇsech metod
Synchronizace vl ´aken
Blok jako kritick ´a sekce
stejn ´y princip synchronizace
metody se nemus´ı deklarovat jako synchronizovan ´e deklaruje se objekt, jeho ˇz monitor se pou ˇzije pˇri obsluze kritick ´e sekce
synchronized(cybbyhole) { cybbyhole.put(i); } synchronized(cybbyhole) { value = cybbyhole.get(); }
Pˇreruˇsen´ı vl ´akna
Operaceinterrupt()(Thread)
je nastaven pˇr´ıznak
pˇreru ˇsiteln ´e operace (sleep(), ...) testuj´ı tento pˇr´ıznak
Vl ´akno je b ˇe ˇz´ıc´ı
pouze se nastav´ı pˇr´ıznak
lze otestovat (isInterrupted())
Vl ´akno je ˇcekaj´ıc´ı je nastaven pˇr´ıznak
vl ´akno se rozb ˇehne a vygeneruje se v´yjimka (InterruptedException)
Viditelnost prom ˇenn ´e
Viditelnost sd´ılen ´e prom ˇenn ´e
zm ˇena hodnoty se nemus´ı projevit okamˇzit ˇe (optimalizace)
Proˇc nepouˇz´ıvat stop() a suspend()?
stop()uvoln´ı vˇsechny monitory blokovan ´e vl ´aknem
nebezpe ˇc´ı pˇr´ıstupu k objekt ˚um v nekonzistentn´ım stavu
⇒pˇrirozen ´e ukon ˇcen´ı metodyrun()
suspend(),resume()
suspenduje/uvoln´ı vl ´akno
suspendovan ´e vl ´akno drˇz´ı monitor (viz kritick ´a sekce); vl ´akno, kter ´e ho m ´a uvolnit (viz resume), mus´ı vstoupit do
t ´eto sekce⇒dead-lock
⇒wait(),notify()
v´ıce na
Explicitn´ı z ´amky
java.util.concurrent.locks
rozhran´ıLock,Condition,ReadWriteLock
tˇr´ıdyReentrantLock,ReentrantReadWriteLock
tˇr´ıdaReentrantReadWriteLock.ReadLock
Lock
public interface Lock { void lock();
void lockInterruptibly()
throws InterruptedException; boolean tryLock();
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition(); }
ReentrantLock implements Lock
Lock lock = new ReentrantLock(); lock.lock(); try { ... } finally { lock.unlock(); }
Lock a Condition
public interface Condition {
void await() throws InterruptedException; boolean await(long time, TimeUnit unit)
throws InterruptedException; long awaitNanos(long nanosTimeout)
throws InterruptedException; void awaitUninterruptibly()
throws InterruptedException; long awaitUntil(Date deadline)
throws InterruptedException; void signal();
void signalAll(); }
Vztah z ´amk ˚u a JVM
Intrinsic locks m ´en ˇe flexibiln´ı
JVM v´ı o vztahu vl ´akna a z ´amku Explicit locks
flexibiln´ı
Synchronizovan ´e struktury
java.util.concurrent BlockingQueue SynchronousQueue Semaphore . . .private BlockingQueue cubbyhole; cubbyhole.put(i);
Synchronizovan ´e kolekce
Statick ´e metody tˇr´ıdyCollections
XXX synchronizedXXX(XXX c);
Collection synchronizedSet(Collection c);
Map synchronizedMap(Map c); . . .
Synchronizovan ´e kolekce
public class SynchronDemo {public static void main(String[] args) { List seznam = new ArrayList();
seznam.add(new Integer(20)); ...
Iterator i = seznam.iterator();
// Simulace soubezne akce jineho vlakna!! seznam.remove(new Integer(20));
// ...
Integer c = (Integer) i.next(); }
Synchronizovan ´e kolekce
Synchronization wrapper
public class SynchronDemo2 {
public static void main(String[] args) { List seznam = Collections.
synchronizedList(new ArrayList()); seznam.add(new Integer(20));
synchronized(seznam) {
Iterator i = seznam.iterator(); Integer c = (Integer) i.next(); }
} }
SynchronizedCollection
static class SynchronizedCollection<E> ... { Collection<E> c; // Backing Collection Object mutex; // Object on which to
// synchronize SynchronizedCollection(Collection<E> c) { this.c = c; mutex = this; } SynchronizedCollection(Collection<E> c, Object mutex) { this.c = c; this.mutex = mutex; }
SynchronizedCollection
public boolean add(E o) {
synchronized(mutex) {return c.add(o);} }
public Iterator<E> iterator() { return c.iterator();
// Must be manually synchronized by user! }
Zdroje informac´ı
http://java.sun.com/docs/books/tutorial/essential/ http://www.programming-x.com/programming/
brian-goetz.html http://www.javaworld.com