1. Общ преглед
Този урок ще бъде въведение в Apache Storm, разпределена система за изчисления в реално време.
Ще се съсредоточим върху и ще обхванем:
- Какво точно представлява Apache Storm и какви проблеми решава
- Неговата архитектура и
- Как да го използвам в проект
2. Какво представлява Apache Storm?
Apache Storm е безплатна и с отворен код разпределена система за изчисления в реално време.
Той осигурява толерантност към грешки, мащабируемост и гарантира обработка на данни и е особено добър при обработката на неограничени потоци от данни.
Някои добри случаи на използване на Storm могат да бъдат обработка на операции с кредитни карти за откриване на измами или обработка на данни от интелигентни домове за откриване на дефектни сензори.
Storm позволява интеграция с различни бази данни и системи за опашки, налични на пазара.
3. Зависимост на Maven
Преди да използваме Apache Storm, трябва да включим зависимостта на ядрото на бурята в нашия проект:
org.apache.storm storm-core 1.2.2 provided
Трябва да използваме предоставения обхват само ако възнамеряваме да стартираме нашето приложение на клъстера Storm.
За да стартираме приложението локално, можем да използваме така наречения локален режим, който ще симулира клъстера Storm в локален процес, в такъв случай трябва да премахнем предоставеното.
4. Модел на данни
Моделът на данни на Apache Storm се състои от два елемента: кортежи и потоци.
4.1. Tuple
А кортеж е подреден списък с имена полета с видове динамични. Това означава, че не е нужно изрично да декларираме видовете полета.
Storm трябва да знае как да сериализира всички стойности, които се използват в кортеж. По подразбиране той вече може да сериализира примитивни типове, низове и байтови масиви.
И тъй като Storm използва Kryo сериализация, трябва да регистрираме сериализатора с помощта на Config, за да използваме персонализираните типове. Можем да направим това по един от двата начина:
Първо, можем да регистрираме класа за сериализиране, използвайки пълното му име:
Config config = new Config(); config.registerSerialization(User.class);
В такъв случай Kryo ще сериализира класа, използвайки FieldSerializer. По подразбиране това ще сериализира всички непостоянни полета на класа, както частни, така и публични.
Или вместо това можем да предоставим както класа за сериализиране, така и сериализатора, който искаме Storm да използва за този клас:
Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);
За да създадем персонализиран сериализатор, трябва да разширим общия клас сериализатор, който има два метода за писане и четене.
4.2. Поток
А на живо е основната черпене в екосистемата буря. В поток е неограничена последователност на кортежи.
Storms позволява паралелно обработване на множество потоци.
Всеки поток има идентификатор, който се предоставя и присвоява по време на декларацията.
5. Топология
Логиката на приложението Storm в реално време е включена в топологията. Топологията се състои от чучури и болтове .
5.1. Чучур
Чучурите са източниците на потоците. Те излъчват кортежи към топологията.
Кортежи могат да се четат от различни външни системи като Kafka, Kestrel или ActiveMQ.
Чучурите могат да бъдат надеждни или ненадеждни . Надежден означава, че чучурът може да отговори, че кортежът, който не е успял да бъде обработен от Storm. Ненадежден означава, че чучурът не отговаря, тъй като ще използва механизъм за огън и забрава за излъчване на кортежите.
За да създадем персонализиран чучур, трябва да внедрим интерфейса IRichSpout или да разширим който и да е клас, който вече реализира интерфейса, например абстрактен клас BaseRichSpout .
Нека създадем ненадежден чучур:
public class RandomIntSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); outputCollector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); } }
Нашият персонализиран RandomIntSpout ще генерира произволно цяло число и клеймо за всяка секунда.
5.2. Болт
Болтовете обработват кортежи в потока. Те могат да извършват различни операции като филтриране, агрегиране или персонализирани функции.
Някои операции изискват множество стъпки и по този начин ще трябва да използваме множество болтове в такива случаи.
За да създадем персонализиран Bolt , трябва да внедрим IRichBolt или за по-прости операции IBasicBolt интерфейс.
Има и множество помощни класове за внедряване на Bolt. В този случай ще използваме BaseBasicBolt :
public class PrintingBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
Този потребителски PrintingBolt просто ще отпечата всички кортежи на конзолата.
6. Създаване на проста топология
Нека обединим тези идеи в една проста топология. Нашата топология ще има един чучур и три болта.
6.1. RandomNumberSpout
В началото ще създадем ненадежден чучур. Той ще генерира произволни цели числа от диапазона (0,100) всяка секунда:
public class RandomNumberSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); collector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); collector.emit(values); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }
6.2. FilteringBolt
След това ще създадем болт, който ще филтрира всички елементи с операция, равна на 0:
public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); if (operation > 0) { basicOutputCollector.emit(tuple.getValues()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }
6.3. AggregatingBolt
След това нека създадем по-сложен болт, който ще събере всички положителни операции от всеки ден.
За тази цел ще използваме специфичен клас, създаден специално за внедряване на болтове, които работят на прозорци, вместо да работят на единични кортежи: BaseWindowedBolt .
Windows е основна концепция при обработката на потоци, разделяща безкрайните потоци на крайни парчета. След това можем да приложим изчисления към всеки парче. Обикновено има два вида прозорци:
Времевите прозорци се използват за групиране на елементи от даден период от време, като се използват времеви марки . Времевите прозорци могат да имат различен брой елементи.
Прозорците за броене се използват за създаване на прозорци с определен размер . В такъв случай всички прозорци ще имат еднакъв размер и прозорецът няма да бъде излъчен, ако има по-малко елементи от определения размер.
Нашият AggregatingBolt ще генерира сумата от всички положителни операции от времеви прозорец, заедно с неговите начални и крайни времеви марки:
public class AggregatingBolt extends BaseWindowedBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute(TupleWindow tupleWindow) { List tuples = tupleWindow.get(); tuples.sort(Comparator.comparing(this::getTimestamp)); int sumOfOperations = tuples.stream() .mapToInt(tuple -> tuple.getIntegerByField("operation")) .sum(); Long beginningTimestamp = getTimestamp(tuples.get(0)); Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1)); Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit(values); } private Long getTimestamp(Tuple tuple) { return tuple.getLongByField("timestamp"); } }
Имайте предвид, че в този случай получаването на първия елемент от списъка е безопасно. Това е така, защото всеки прозорец се изчислява с помощта на полето за времеви клей на Tuple, така че във всеки прозорец трябва да има поне един елемент.
6.4. FileWritingBolt
Накрая ще създадем болт, който ще вземе всички елементи със sumOfOperations по-големи от 2000, ще ги сериализира и ще ги запише във файла:
public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; @Override public void cleanup() { try { writer.close(); } catch (IOException e) { logger.error("Failed to close writer!"); } } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try { writer = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { logger.error("Failed to open a file for writing.", e); } } @Override public void execute(Tuple tuple) { int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); long endTimestamp = tuple.getLongByField("endTimestamp"); if (sumOfOperations > 2000) { AggregatedWindow aggregatedWindow = new AggregatedWindow( sumOfOperations, beginningTimestamp, endTimestamp); try { writer.write(objectMapper.writeValueAsString(aggregatedWindow)); writer.newLine(); writer.flush(); } catch (IOException e) { logger.error("Failed to write data to file.", e); } } } // public constructor and other methods }
Имайте предвид, че не е нужно да декларираме изхода, тъй като това ще бъде последният болт в нашата топология
6.5. Изпълнение на топологията
И накрая, можем да съберем всичко и да изпълним нашата топология:
public static void runTopology() { TopologyBuilder builder = new TopologyBuilder(); Spout random = new RandomNumberSpout(); builder.setSpout("randomNumberSpout"); Bolt filtering = new FilteringBolt(); builder.setBolt("filteringBolt", filtering) .shuffleGrouping("randomNumberSpout"); Bolt aggregating = new AggregatingBolt() .withTimestampField("timestamp") .withLag(BaseWindowedBolt.Duration.seconds(1)) .withWindow(BaseWindowedBolt.Duration.seconds(5)); builder.setBolt("aggregatingBolt", aggregating) .shuffleGrouping("filteringBolt"); String filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt(filePath); builder.setBolt("fileBolt", file) .shuffleGrouping("aggregatingBolt"); Config config = new Config(); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Test", config, builder.createTopology()); }
To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.
For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt. The source of elements may be a Spout or another Bolt. And if we set the same source for more than one bolt, the source will emit all elements to each of them.
In this case, our topology will use the LocalCluster to run the job locally.
7. Conclusion
В този урок въведохме Apache Storm, разпределена система за изчисления в реално време. Създадохме чучур, няколко болта и ги събрахме в пълна топология.
И както винаги, всички мостри на кода могат да бъдат намерени в GitHub.