Laborator 2

Tematica laboratorului

1. Crearea unei aplicatii ce foloseste Storm

Pentru crearea unui proiect Storm in mod local (pentru testare pe masina proprie) sunt suficienti urmatorii pasi in Eclipse:

  • Se creeaza un nou proiect Maven in Eclipse (se foloseste arhetipul maven-archetype-quickstart din cele de la org.apache.maven, iar ca exemplu pentru Group Id: ebs.lab si pentru Artifact Id: Lab 01).
  • In configurarea JRE System Library, se modifica versiunea de JRE cu cea disponibila pe sistem (workspace default jre - e necesar minim Java 8). De asemenea e necesara setarea corespondenta pentru Compiler Compliance (1.8).
  • Se adauga o dependenta pentru Storm in fisierul pom.xml; click pe fisierul din radacina proiectului, zona Dependencies, Add cu urmatoarea configuratie:
        GroupId org.apache.storm
        ArtifactId storm-core
        Version 1.2.4
      
    (Se poate utiliza si o versiune mai noua, ex., 2.1.1)

Adaugarea dependentei va rezulta in downloadul pachetelor Storm si a dependentelor asociate. In sectiunea Maven Dependencies, pentru pachetul storm-core (click dreapta) actionati si optiunile Download JavaDoc si Download Sources pentru obtinerea surselor adnotate ale pachetelor storm.

2. Crearea unei topologii simple in Storm

O topologie Storm e reprezentata de o serie de operatori inlantuiti care paseaza intre ei mesaje ce contin date (de exemplu asociate unei serii de evenimente), cu scopul de a executa o anume procesare asupra acestui flux de date. Acesti operatori pot fi distribuiti pe mai multe masini, si inlantuiti in diferite moduri pentru obtinerea de eficienta in procesare. Exista doua tipuri principale de operatori:

  • spout: operator de tip sursa de data care reprezinta originea fluxului emis in topologia storm (acest operator fie poate genera fluxul, fie de obicei in cazurile practice il preia din alte surse ex., o baza de date)
  • bolt: operator de tip procesare care efectueaza o anume procesare asupra fluxului de date primit inainte de a-l emite mai departe, sau a obtine rezultatul (in cazul operatorilor bolt terminali)

In continuare vom defini o topologie simpla de test, utilizata frecvent ca exemplu, care are ca functionalitate procesarea unui flux de cuvinte cu scopul de a le numara. In compunerea acestei topologii definim 4 operatori:

  • SourceTextSpout - un operator spout sursa care va genera o serie de cuvinte ca flux sursa de date
  • SplitTextBolt - un operator bolt care va imparti cuvintele din text
  • WordCountBolt - un operator bolt care va numara cuvintele identice
  • TerminalBolt - un operator bolt terminal care va centraliza si afisa rezultatul numaratorii

In urmatoarele sunt detaliate cateva dintre functiile abstracte implementate de clasele operatorilor de mai sus.

In operatorul spout care extinde BaseRichSpout:

  • public void declareOutputFields(OutputFieldsDeclarer declarer)
    Fluxul de date emis in topologia Storm e format din tuple de forma (nume,valoare). Prin argumentul declarer se definesc campurile din tuplele emise de spout (numele acestora).
  • public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector)
    Metoda este apelata la initializarea spoutului. Argumentul map include argumentele de configurare, context include informatii despre structura topologiei, si collector ofera metode pentru emiterea tuplelor.
  • public void nextTuple()
    Metoda nextTuple() este apelata de platforma Storm periodic intr-o bucla continua fiind metoda care are rolul de a emite un tuplu din streamul de date.

In operatorii bolt care extind BaseRichBolt:

  • public void prepare(Map<String, Object> stormConf, TopologyContext context, OutputCollector collector)
    Este o metoda similara cu open din spout fiind apelata la initializarea operatorului bolt. Aici ar trebui incluse initializarile membrilor operatorului (in special daca acestia nu sunt serializabili).
  • public void declareOutputFields(OutputFieldsDeclarer declarer)
    Este o metoda similara cu cea din operatorul spout.
  • public void execute(Tuple input)
    Metoda este executata automat de platforma Storm la fiecare primire a unui tuplu input. Aceasta metoda trebuie sa contina codul de procesare al datelor primite in tuple pe fluxul de date.
  • public void cleanup()
    Aceasta metoda este apelata la inchiderea unui operator bolt cand/daca executia topologiei este finalizata. Suprascrierea implementarii acestei metode are de obicei sens in operatorii bolt terminali.

Operatorii sunt reuniti sub forma unei topologii dupa cum s-a mentionat. Pentru cei de mai sus o varianta de topologie se regaseste in aceasta clasa principala a aplicatiei care include functia main ce ruleaza topologia in cauza. Metodele esentiale pentru crearea si rularea topologiei sunt urmatoarele (in descriere sumara):

  • setSpout() - apelata dintr-o instanta a clasei TopologyBuilder seteaza operatorul spout pentru respectiva topologie
  • setBolt() - apelata dintr-o instanta a clasei ToplogyBuilder adauga un operator bolt pentru respectiva topologie; un bolt este legat de precedentul operator stabilind un mod de grupare printr-o metoda de tip "Grouping" apelata consecutiv dupa "setBolt"
  • submitTopology() - apelata dintr-o instanta a clasei LocalCluster porneste executia unei topologii
  • killTopology() - apelata dintr-o instanta a clasei LocalCluster opreste executia unei topologii

3. Fine tuning tips

Fiecare operator consumator are asociat un buffer ce retine in coada datele primite pana ce acestea sunt consumate de operatorul respectiv. Dimensiunea acestui buffer poate fi setata ca optiune pe configuratia pasata topologiei (instanta clasei Config). Valoarea reprezinta numarul de tuple retinute maxim in buffer si trebuie sa fie obligatoriu o putere a lui 2. Pot exista mai multi producatori care sa umple un buffer de consumator, deci acesta se poate ajusta corespunzator nivelului de incarcare. Un buffer de dimensiuni mai mari va ocupa mai multa memorie (chiar daca nu este plin). Pe de alta parte, un buffer de dimensiuni mai mici daca este plin poate intarzia trimiterea de noi date din partea producatorilor. Exemplu de setare:

config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,1024);

Trimiterea de tuple se poate face fie singular - cate un tuplu - fie in batch-uri de mai multe tuple. Dimensiunea acestui batch este configurabila. Folosirea de batch-uri scade numarul de operatii de sincronizare necesare la trimiterea fiecarui tuplu, ceea ce creste valoarea de throughput. Pe de alta parte latenta trimiterii fiecarui tuplu poate fi crescuta din cauza asteptarii pentru umplerea batch-ului. Prin setarea dimensiunii batch-ului la 1 se anuleaza practic utilizarea acestui mod (se favorizeaza latentele).

config.put(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE,1);

In versiunea Storm 2.1.0 sau mai noua optiunea de mai sus a fost redenumita ca: TOPOLOGY_TRANSFER_BATCH_SIZE.

© 2022 Emanuel Onica. Parts of design by W3Layouts