added myaggregator and modified classes
This commit is contained in:
73
2021.11.16.txt
Normal file
73
2021.11.16.txt
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package com.release11;
|
||||||
|
|
||||||
|
import com.mysql.cj.jdbc.MysqlDataSource;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||||
|
import org.apache.camel.*;
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
import org.apache.camel.component.jms.JmsComponent;
|
||||||
|
import org.apache.camel.impl.DefaultCamelContext;
|
||||||
|
import org.apache.camel.support.SimpleRegistry;
|
||||||
|
import org.apache.log4j.BasicConfigurator;
|
||||||
|
|
||||||
|
public class Main {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
//MyBuilder myBuilder = new MyBuilder();
|
||||||
|
//myBuilder.doRandomMaterials();
|
||||||
|
//myBuilder.doRandomPackage();
|
||||||
|
|
||||||
|
//to("log:?level=INFO&showHeaders=true&showBody=true")
|
||||||
|
|
||||||
|
MysqlDataSource source = new MysqlDataSource();
|
||||||
|
String jacek = "jdbc:mysql://10.101.111.19:3306/camel_db";
|
||||||
|
source.setURL(jacek);
|
||||||
|
source.setUser("root");
|
||||||
|
source.setPassword("admin");
|
||||||
|
SimpleRegistry registry = new SimpleRegistry();
|
||||||
|
registry.bind("source", MysqlDataSource.class, source);
|
||||||
|
CamelContext context = new DefaultCamelContext(registry);
|
||||||
|
BasicConfigurator.configure();
|
||||||
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://10.101.111.19:8088");
|
||||||
|
connectionFactory.setUserName("admin");
|
||||||
|
connectionFactory.setPassword("admin");
|
||||||
|
context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
|
||||||
|
|
||||||
|
DAO myFunctions = new DAO();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
context.addRoutes(new RouteBuilder() {
|
||||||
|
@Override
|
||||||
|
public void configure() throws Exception {
|
||||||
|
|
||||||
|
from("direct:start")
|
||||||
|
.setBody(constant("SELECT * FROM material LIMIT 10"))
|
||||||
|
.to("jdbc:source")
|
||||||
|
.split(body())
|
||||||
|
.to("activemq:queue:material");
|
||||||
|
|
||||||
|
from("activemq:queue:material")
|
||||||
|
.split(body())
|
||||||
|
//.bean(myFunctions, "getId")
|
||||||
|
.setHeader("id", simple("${body[id]}"))
|
||||||
|
.setBody(constant("SELECT * FROM package WHERE material_id = :?id"))
|
||||||
|
.to("jdbc:source?useHeadersAsParameters=true")
|
||||||
|
.to("activemq:queue:materialPackage");
|
||||||
|
//.to("log:?level=INFO&showBody=true");
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
context.start();
|
||||||
|
ProducerTemplate template = context.createProducerTemplate();
|
||||||
|
template.sendBody("direct:start", null);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
//template.setDefaultEndpointUri("activemq:queue:material");
|
||||||
|
|
||||||
|
context.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -45,19 +45,20 @@ public class Main {
|
|||||||
public void configure() throws Exception {
|
public void configure() throws Exception {
|
||||||
|
|
||||||
from("direct:start")
|
from("direct:start")
|
||||||
.setBody(constant("SELECT * FROM material LIMIT 10"))
|
.setBody(constant("SELECT * FROM material"))
|
||||||
.to("jdbc:source")
|
.to("jdbc:source")
|
||||||
.split(body())
|
.split(body())
|
||||||
.to("activemq:queue:material");
|
.to("activemq:queue:material");
|
||||||
|
|
||||||
|
from("direct:getPackages")
|
||||||
|
.setHeader("material_id", simple("${body[id]}"))
|
||||||
|
.setBody(simple("SELECT * FROM package WHERE material_id = :?material_id"))
|
||||||
|
.to("jdbc:source?useHeadersAsParameters=true");
|
||||||
|
|
||||||
from("activemq:queue:material")
|
from("activemq:queue:material")
|
||||||
.split(body())
|
.enrich("direct:getPackages",new MyAggregator())
|
||||||
//.bean(myFunctions, "getId")
|
.to("activemq:queue:materialPackage")
|
||||||
.setHeader("id", simple("${body[id]}"))
|
.to("log:?level=INFO&showBody=true");
|
||||||
.setBody(constant("SELECT * FROM package WHERE material_id = :?id"))
|
|
||||||
.to("jdbc:source?useHeadersAsParameters=true")
|
|
||||||
.to("activemq:queue:materialPackage");
|
|
||||||
//.to("log:?level=INFO&showBody=true");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -3,15 +3,26 @@ package com.release11;
|
|||||||
import org.apache.camel.AggregationStrategy;
|
import org.apache.camel.AggregationStrategy;
|
||||||
import org.apache.camel.Exchange;
|
import org.apache.camel.Exchange;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class MyAggregator implements AggregationStrategy {
|
public class MyAggregator implements AggregationStrategy {
|
||||||
@Override
|
@Override
|
||||||
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
|
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
|
||||||
|
|
||||||
String oldEx = oldExchange.getIn().getBody().toString();
|
HashMap oldEx = (HashMap) oldExchange.getIn().getBody();
|
||||||
System.out.println(oldEx);
|
System.out.println(oldEx);
|
||||||
String newEx = newExchange.getIn().getBody().toString();
|
ArrayList<HashMap> newEx = (ArrayList) newExchange.getIn().getBody();
|
||||||
System.out.println(newEx);
|
System.out.println(newEx);
|
||||||
|
|
||||||
return null;
|
|
||||||
|
oldEx.put("packages", newEx);
|
||||||
|
Exchange result = oldExchange;
|
||||||
|
result.getIn().setBody(oldEx);
|
||||||
|
//result.getIn().setBody(body);
|
||||||
|
System.out.println(result.getIn().getBody());
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
BIN
target/classes/com/release11/MyAggregator.class
Normal file
BIN
target/classes/com/release11/MyAggregator.class
Normal file
Binary file not shown.
Reference in New Issue
Block a user