Laborator 4

Tematica laboratorului

1. Gruparea directa a fluxului de evenimente

Storm permite o grupare directa a tuplelor ce descriu evenimentele primite de operatori ce se identifica cu un anumit flux. Gruparea directa poate orienta evenimentele spre un anumit task in functie de ID-ul acestuia, similar cu posibilitatea exemplificata intr-o grupare custom dar fara o definire explicita a unei grupari custom. Pasii, in detaliu pentru utilizarea unei grupari directe ar fi urmatorii:

a) In definirea topologiei trebuie setata o modalitate de grupare directa catre operatorul dorit.
Nota: In exemplul de mai jos aceasta modalitate este aditionala unei alte grupari, operatorul in cauza primind in acest caz doua fluxuri de evenimente pe doua grupari diferite.

// terminal_bolt va primi un flux grupat global de la taskurile operatorului count_bolt, 
// si un alt flux grupat direct de la operatorul spout

builder.setBolt(TERMINAL_BOLT_ID, terminalbolt).globalGrouping(COUNT_BOLT_ID).directGrouping(SPOUT_ID, "secondary"); 

b) Fluxul de date (stream-ul) pentru care se realizeaza gruparea directa se declara explicit cu un nume si setand modalitatea de grupare directa in metodele declareOutputFields(). Nota: Se pot defini si fluxuri de date separate de cel implicit in Storm prin aceasta modalitate de grupare directa.

public void declareOutputFields(OutputFieldsDeclarer declarer) {

		// fluxul de date implicit
		declarer.declare(new Fields("words")); 
		// un alt flux de date - nume: "secondary", 
		// definit ca flux direct prin al doilea parametru
		declarer.declareStream("secondary", true, new Fields("globalcount"));
	}
	 

c) In metoda de initializare a operatorului ce va directiona fluxul de date (prepare() sau open()) trebuie obtinute id-urile task-urilor operatorului spre care se doreste directionarea fluxului. Operatorul destinatie este identificat prin numele definit la crearea topologiei. Id-urile taskurilor corespondente se obtin folosind metoda getComponentTasks() din cadrul contextului de topologie primit ca parametru in metoda de initializare.


private List direct_target_tasks;

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;

		// TERMINAL_BOLT_ID = "terminal_bolt" este numele folosit la crearea topologiei in metoda main()

		this.direct_target_tasks = context.getComponentTasks("terminal_bolt");
	}
 

d) Emiterea tuplelor se va face folosind metoda emitDirect() cu precizarea fluxului de evenimente si a taskurilor spre care se doreste emiterea.


public void nextTuple() {

		if (i < sourcetext.length) {
		
			this.collector.emit(new Values(sourcetext[i]));
			i++;
		
			// se va emite direct catre primul task al operatorului destinatie, 
			// pe fluxul "secondary", un tuplu cu un camp (globalcount)
			globalcounter++;
			this.collector.emitDirect(direct_target_tasks.get(0), "secondary", new Values(globalcounter));
	
		}

	} 

2. Toleranta la caderi in Storm (notiuni de baza)

Toleranta la caderi in Storm asigura garantarea procesarii datelor transmise in cadrul fluxului. Un flux de date in Storm, este asa cum am vazut in laboratoarele anterioare, format dintr-o serie de tuple, transmise intre operatorii bolt, care pot modifica structura acestor tuple inainte de a le transmite mai departe. Putem observa ca toate tuplele derivate din acelasi flux original, impreuna cu operatorii bolt prin care trec formeaza o structura de tip arbore, unde radacina este operatorul spout care a emis forma initiala a tuplelor din fluxul de date respectiv. Numim deci aceasta structura arborele fluxului de date emis de spout.
Operatorii spout in Storm pun la dispozitie doua metode ce pot fi folosite in a observa daca un tuplu din flux a fost procesat complet pe arborele fluxului:

  • ack() - Metoda este apelata de platforma Storm din operatorul spout daca intern toti operatorii bolt de pe arborele fluxului confirma ca au executat procesarea pentru un tuplu.
  • fail() - Metoda este apelata de platforma Storm din operatorul spout daca intern macar unul din operatorii bolt de pe arborele fluxului anunta esecul procesarii unui tuplu, sau confirmarea procesarii depaseste un timp limita.

Pentru a activa apelul acestor metode sunt necesare urmatoarele:

  • In operatorul spout emiterea tuplelor sa se faca folosind signatura functiei emit() care permite specificarea unui id unic pentru tuplu (asocierea acestui id indica platformei Storm ca procesarea tuplului respectiv pe arborele fluxului trebuie confirmata).
  • In operatorii bolt emiterea tuplelor sa se faca folosind signatura functiei emit() care permite ancorarea - cea in care primul parametru al functiei este tuplul primit de operator de la operatorul precedent.
  • In operatorii bolt, dupa emiterea tuplului sa se apeleze functiile ack(tuplu) (pentru confirmarea procesarii), sau fail(tuplu) in cazul notificarii explicite a unui esec (functiile sunt membre ale instantei OutputCollector din care s-a facut si emiterea).

Pentru a garanta procesarea datelor operatorul spout poate de exemplu sa retina valorile tuplelor trimise intr-un map pe baza id-ului pana la confirmarea acestora. In metoda ack(), operatorul spout va sterge pe baza parametrului id tuplele retinute in map. In metoda fail(), operatorul spout va re-emite pe baza parametrului id tuplele retinute in map.

Surse actualizate topologie numarare cuvinte:

© 2022 Emanuel Onica. Parts of design by W3Layouts