From 8b72c79475051643ae9cdfe25fdce3651abf26eb Mon Sep 17 00:00:00 2001 From: Igor Date: Wed, 17 Nov 2021 11:51:55 +0100 Subject: [PATCH] added myaggregator and modified classes --- 2021.11.16.txt | 73 ++++++++++++++++++ src/main/java/com/release11/Main.java | 17 ++-- src/main/java/com/release11/MyAggregator.java | 17 +++- target/classes/com/release11/Main$1.class | Bin 1783 -> 1987 bytes target/classes/com/release11/Main.class | Bin 2427 -> 2427 bytes .../classes/com/release11/MyAggregator.class | Bin 0 -> 1319 bytes 6 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 2021.11.16.txt create mode 100644 target/classes/com/release11/MyAggregator.class diff --git a/2021.11.16.txt b/2021.11.16.txt new file mode 100644 index 0000000..01761d5 --- /dev/null +++ b/2021.11.16.txt @@ -0,0 +1,73 @@ +package com.release11; + +import com.mysql.cj.jdbc.MysqlDataSource; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.camel.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.JmsComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.SimpleRegistry; +import org.apache.log4j.BasicConfigurator; + +public class Main { + + public static void main(String[] args) throws Exception { + + //MyBuilder myBuilder = new MyBuilder(); + //myBuilder.doRandomMaterials(); + //myBuilder.doRandomPackage(); + + //to("log:?level=INFO&showHeaders=true&showBody=true") + + MysqlDataSource source = new MysqlDataSource(); + String jacek = "jdbc:mysql://10.101.111.19:3306/camel_db"; + source.setURL(jacek); + source.setUser("root"); + source.setPassword("admin"); + SimpleRegistry registry = new SimpleRegistry(); + registry.bind("source", MysqlDataSource.class, source); + CamelContext context = new DefaultCamelContext(registry); + BasicConfigurator.configure(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.101.111.19:8088"); + connectionFactory.setUserName("admin"); + connectionFactory.setPassword("admin"); + context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); + + DAO myFunctions = new DAO(); + + + + + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + + from("direct:start") + .setBody(constant("SELECT * FROM material LIMIT 10")) + .to("jdbc:source") + .split(body()) + .to("activemq:queue:material"); + + from("activemq:queue:material") + .split(body()) + //.bean(myFunctions, "getId") + .setHeader("id", simple("${body[id]}")) + .setBody(constant("SELECT * FROM package WHERE material_id = :?id")) + .to("jdbc:source?useHeadersAsParameters=true") + .to("activemq:queue:materialPackage"); + //.to("log:?level=INFO&showBody=true"); + + } + }); + + context.start(); + ProducerTemplate template = context.createProducerTemplate(); + template.sendBody("direct:start", null); + Thread.sleep(1000); + //template.setDefaultEndpointUri("activemq:queue:material"); + + context.stop(); + } +} diff --git a/src/main/java/com/release11/Main.java b/src/main/java/com/release11/Main.java index bff3989..963b31a 100644 --- a/src/main/java/com/release11/Main.java +++ b/src/main/java/com/release11/Main.java @@ -45,19 +45,20 @@ public class Main { public void configure() throws Exception { from("direct:start") - .setBody(constant("SELECT * FROM material LIMIT 10")) + .setBody(constant("SELECT * FROM material")) .to("jdbc:source") .split(body()) .to("activemq:queue:material"); + from("direct:getPackages") + .setHeader("material_id", simple("${body[id]}")) + .setBody(simple("SELECT * FROM package WHERE material_id = :?material_id")) + .to("jdbc:source?useHeadersAsParameters=true"); + from("activemq:queue:material") - .split(body()) - //.bean(myFunctions, "getId") - .setHeader("id", simple("${body[id]}")) - .setBody(constant("SELECT * FROM package WHERE material_id = :?id")) - .to("jdbc:source?useHeadersAsParameters=true") - .to("activemq:queue:materialPackage"); - //.to("log:?level=INFO&showBody=true"); + .enrich("direct:getPackages",new MyAggregator()) + .to("activemq:queue:materialPackage") + .to("log:?level=INFO&showBody=true"); } }); diff --git a/src/main/java/com/release11/MyAggregator.java b/src/main/java/com/release11/MyAggregator.java index f82b885..488d3f9 100644 --- a/src/main/java/com/release11/MyAggregator.java +++ b/src/main/java/com/release11/MyAggregator.java @@ -3,15 +3,26 @@ package com.release11; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + public class MyAggregator implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { - String oldEx = oldExchange.getIn().getBody().toString(); + HashMap oldEx = (HashMap) oldExchange.getIn().getBody(); System.out.println(oldEx); - String newEx = newExchange.getIn().getBody().toString(); + ArrayList newEx = (ArrayList) newExchange.getIn().getBody(); System.out.println(newEx); - return null; + + oldEx.put("packages", newEx); + Exchange result = oldExchange; + result.getIn().setBody(oldEx); + //result.getIn().setBody(body); + System.out.println(result.getIn().getBody()); + + return result; } } diff --git a/target/classes/com/release11/Main$1.class b/target/classes/com/release11/Main$1.class index 06d02fcbf66fb55a10acb8d49af2fc4fe09e75b0..0941cc6bd7751bf0f92c4a03e82d6d2f916e71a2 100644 GIT binary patch delta 586 zcmZ9JU2D`p7=_Oy*%)K25H(<>6h(!`((Trd)@--i53TNMyVX|LTC1d+F^#4hn~lmM z#h)O{crE^gf~24zf_USN{u3{~6W^?gA2TrTne(0*9%jDn{jg8}^!?2{043b%=pUKY z`C@H<9hS|2r8(&BY`pM8Dm6)hqg?1G0uo;)QbL zsD>&-Ul{pLIpmLdSgOp=)=sq6qbJkEUrORf;voZH{;KLDJ?#ioUvS4rHmJx0qbs!E}n5taXuES*>J|KM)Rj#547fKSG}| zfl*X2MnDd3qX-Y<2{M2Ix0HZ)?;cIvaG^-)EurM@L}QDzpCV5Hp;5x+s+kFahe zBe4*Pu(07*_zQNpQw!e9yXV|<@5?>!J+*$eNUvY_4**iwxt5+;_w{>+FIdPGbDKde zR{w|z0m0NoXbjE07YCMpzGv#?O9>K&WsG2yZ@Kc~AjV}(Ai-~3N8Tn(GKBdz>uk@S zomP!n&9qKdEZq>1Vu6z5|Zc?F~c|9s^}yE5g8uxgt*}`S%JTKGX6CN<`6;_bicx8YN5glvSBh^oYF#BGc{BB?4T~8B16g|^+S=z2b`H}*nAYu!Y6+e;1DiFVt7ENj}CcF&GWV^WSHoIH3|4Mz( z;DhnmpW=@&-q~%jwLqH8^xk{!nRD;l*x4WuL(zg^3kkPwqQ^!5xl4~ z%;fT=K3G}!KDUIC#zhRN7}k)%2*c!AKn9iQw%x-P!x$0%FVj#l#*hi!&A@i_=iINB zc_WDlTv9QqA&aS=@+vJq5H+gGZBmir5u@$uuROaRR02=%nvuc`E~}W;a0OQxMnn2u z3<+z<42LQm&-Lrd@2QyYq=w42Jdd|ZwjT^)fnkuE_l`Q@r!a~|4L6V@O%17O{S>-W z4(NBurY>n%mYP*RYMY7z!<6aPbWb>f`(kZPFSoWV%M%t4Tu+70u+Z234#0Nn)Ds>E zi!LCs;~vrl50&h?*lpGhg!h^sIOG{Axh8k^xMxed?Mwt!+ov%ooqf2$kmPMCL{rK2 z4@_sSjB`S#%-KmAL)vu?J0`>QIXr}|ix2;>N|;HP7`3{4zDT(eVoye>_l_w(;0eF! z(0xwycBj?obZDuI_hdRYyPWM|Hi|@-s<=(h6i;p0@J#m=uFCPm99HN_Gpy44mODLb zT5po2)6QsDmcD^~4#R7-$}o_^En2k*fZMo37Q(IcWxs=?rehmg^_D=zy4-