Apprendre Framework Apache Camel par la pratique

ApprendreFramework Apache Camel par la pratique
Pourquoi mes en-têtes disparaissent-ils?
▶ Comment fonctionne le pipeline Camel:
à partir de ("cxf: bean: orderEntry")
.to (“haricot: monService”)
.to (“cxf: bean: mySoapService”)
.to (“activemq: queue: ORDER_ENTRY”);
▶ Le résultat de la “boîte” précédente est l'entrée de la suivante
▶ Le pipeline s'en occupe
▶ ex. copiez le message de sortie de la "boîte" précédente (le cas échéant) dans le message d'entrée avant d'appeler la "boîte" suivante.
Pourquoi mes en-têtes disparaissent-ils?
▶ Vous définissez le résultat dans votre processeur / haricot dans le message sortant sans copier les en-têtes d'entrée (et les pièces jointes):
Classe publique MyProcessorimplements Processor {
processus d'annulation publique (Exchange Exchange) lève Exception {
Résultat de l'objet =…
…
exchange.getOut (). setBody (résultat);
}
}
▶ Placez le résultat dans le message et laissez le pipeline de Camel faire le travail:
Classe publique MyProcessorimplements Processor {
processus d'annulation publique (Exchange Exchange) lève Exception {
Résultat de l'objet =…
…
exchange.getIn (). setBody (résultat);
}
}
▶ Ou copiez les en-têtes du message entrant (et les pièces jointes) dans le message sortant si l'échange est compatible:
Classe publique MyProcessorimplements Processor {
processus d'annulation publique (Exchange Exchange) lève Exception {
Résultat de l'objet =…
if (exchange.getPattern (). isOutCapable ()) {
exchange.getOut (). setHeaders (exchange.getIn (). getHeaders ());
exchange.getOut (). setAttachments (exchange.getIn (). getAttachments ());
exchange.getOut (). setBody (résultat);
} autre {
…
}
}
}
Gestion des exceptions
▶ Camel prend en charge la gestion des erreurs globales (par contexte Camel) et à étendue de route
▶ Par défaut, Camel utilise le DefaultErrorHandlerto pour gérer les exceptions qui:
- Ne pas renvoyer les échanges
- Propage les exceptions à l'appelant
▶ Camel fournit également les gestionnaires d'erreurs suivants:
- NoErrorHandler
- LoggingErrorHandler
- DeadLetterErrorHandler
- TransactionErrorHandler
▶ Vous pouvez configurer le comportement des gestionnaires d'erreur comme suit:
- nombre de relocalisations / relocalisation pendant (Expression)
- délai de livraison
- redistribution du multiplicateur
- utiliser le message d'origine
-…
Gestion des exceptions
▶ Configurez le gestionnaire d'erreurs global DeadLetterErrorHandleras:
- L'exception sera traitée et ne sera pas renvoyée à l'appelant.
- Cela vous permettra de renvoyer l’échange au max. 5 fois
- Il faudra attendre 1 seconde pour la prochaine livraison
- L'échange ayant échoué sera déplacé dans le point final de la lettre morte
errorHandler (
deadLetterChannel ("activemq: queue: DLQ")
.maximumRedeliveries (5)
.redeliveryDelay (1000));
from ("activemq: queue: start"). routeId ("route-1")
.to (“bean: service1”) // lève une exception Service1Exception
.à("…");
Gestion des exceptions
▶ Pour les exemples suivants, supposons que nous avons l'itinéraire suivant:
from ("cxf: bean: mySoapService"). routeId ("route-1")
.to (“bean: service1”) // lève une exception Service1Exception
.to (“direct: sub”);
from (“direct: sub”). routeId (“route-2”)
.to (“bean: service2”) // lève une exception Service2Exception
.à("…");
Gestion des exceptions
▶ Gestion globale des exceptions:
- Les deux exceptions doivent être traitées de la même manière
- Arrêtez pour continuer l'acheminement de l'échange
- L'exception ne doit pas être renvoyée à l'appelant
onException (Exception.class)
.handled (true)
.to (“bean: globalExceptionHandler”);
from ("cxf: bean: mySoapService"). routeId ("route-1")
.to (“bean: service1”) // lève une exception Service1Exception
.to (“direct: sub”);
from (“direct: sub”). routeId (“route-2”)
.to (“bean: service2”) // lève une exception Service2Exception
.à("…");
Gestion globale des exceptions:
- Les deux exceptions doivent être traitées de la même manière
- Arrêtez pour continuer l'acheminement de l'échange
- l'exception doit être renvoyée à l'appelant
onException (Exception.class)
.handled (false)
.to (“bean: globalExceptionHandler”);
from ("cxf: bean: mySoapService"). routeId ("route-1")
.to (“bean: service1”) // lève une exception Service1Exception
.to (“direct: sub”);
from (“direct: sub”). routeId (“route-2”)
.to (“bean: service2”) // lève une exception Service2Exception
.à("…");
Gestion des exceptions
▶ Gestion globale des exceptions:
- Les deux exceptions doivent être traitées de la même manière
- Continuer l'acheminement de l'échange
- L'exception ne doit pas être renvoyée à l'appelant
onException (Exception.class)
.continued (true)
.to (“bean: globalExceptionHandler”);
from ("cxf: bean: mySoapService"). routeId ("route-1")
.to (“bean: service1”) // lève une exception Service1Exception
.to (“direct: sub”);
from (“direct: sub”). routeId (“route-2”)
.to (“bean: service2”) // lève une exception Service2Exception
.à("…");
Gestion des exceptions
▶ Gestion des exceptions de portée de routage:
- Service1Exception doit être traité d'une manière différente (pas globale)
- Arrête l'acheminement de l'échange
- L'exception ne doit pas être renvoyée à l'appelant
onException (Exception.class)
.handled (true)
.to (“bean: globalExceptionHandler”);
from ("cxf: bean: mySoapService"). routeId ("route-1")
.onException (Service1Exception.class)
.handled (true)
.to (“bean: service1ExceptionHandler”);
.fin()
.to (“bean: service1”) // lève une exception Service1Exception
.to (“direct: sub”);
from (“direct: sub”). routeId (“route-2”)
.to (“bean: service2”) // lève une exception Service2Exception
.à("…");
Gestion des exceptions
▶ Gestion des exceptions de portée de routage:
- Service1Exception et Service2Exception doivent être traités de manière différente (pas globale)
- Arrête l'acheminement de l'échange
- L'exception ne doit pas être renvoyée à l'appelant
onException (Exception.class)
.handled (true)
.to (“bean: globalExceptionHandler”);
from ("cxf: bean: mySoapService"). routeId ("route-1")
.onException (Service1Exception.class, Service2Exception.class)
.handled (true)
.to (“bean: serviceExceptionHandler”);
.fin()
.to (“bean: service1”) // lève une exception Service1Exception
.to (“direct: sub”);
from (“direct: sub”). routeId (“route-2”)
.errorHandler (noErrorHandler ())
.to (“bean: service2”) // lève une exception Service2Exception
.à("…");
Pourquoi mes itinéraires et contextes ont-ils des noms imprévisibles?
<? xml version = "1.0" encoding = "UTF-8"?>
<haricots…>
<camel: camelContext>
<chameau: route>
<chameau: de uri = "direct: A" />
<chameau: à uri = "direct: B" />
</ camel: route>
<chameau: route>
<chameau: de uri = "direct: B" />
<chameau: à uri = "direct: C" />
</ camel: route>
</ camel: camelContext>
<camel: camelContext>
<chameau: route>
<chameau: de uri = "direct: C" />
<chameau: à uri = "direct: D" />
</ camel: route>
<chameau: route>
<chameau: de uri = "direct: D" />
<chameau: à uri = "direct: E" />
</ camel: route>
</ camel: camelContext>
</ beans>
Pourquoi mes itinéraires et contextes ont-ils des noms imprévisibles?
<? xml version = "1.0" encoding = "UTF-8"?>
<haricots…>
<camel: camelContext id = "contexte de A à B et de B à C">
<chameau: route id = "route-à-b">
<chameau: de uri = "direct: A" />
<chameau: à uri = "direct: B" />
</ camel: route>
<chameau: route id = "route-B-à-C">
<chameau: de uri = "direct: B" />
<chameau: à uri = "direct: C" />
</ camel: route>
</ camel: camelContext>
<camel: camelContext id = "contexte-C-à-D-et-D-à-E">
<camel: route id = "route-C-to-D">
<chameau: de uri = "direct: C" />
<chameau: à uri = "direct: D" />
</ camel: route>
<chameau: route id = "route-D-à-E">
<chameau: de uri = "direct: D" />
<chameau: à uri = "direct: E" />
</ camel: route>
</ camel: camelContext>
</ beans>
Pourquoi mes itinéraires et contextes ont-ils des noms imprévisibles?
La classe publique SampleRoute étend RouteBuilder {
public void configure () lève Exception {
from ("direct: A"). routeId ("route-A-to-B")
.to ("direct: B");
…
}
}
<? xml version = "1.0" encoding = "UTF-8"?>
<haricots…>
<bean id = “sampleRoute” class = “… SampleRoute” />
<camel: camelContext id = "contexte de A à B et de B à C">
<camel: routeBuilder ref = “sampleRoute” />
…
</ camel: camelContext>
</ beans>
classe publique Main {
public static void main (String ... args) {
…
DefaultCamelContext ctx = new DefaultCamelContext ();
ctx.setManagementName ("contexte-A-à-B-et-B-à-C");
…
}
}
Comment démarrer / arrêter ou suspendre / reprendre les itinéraires au moment de l'exécution?
▶ Utilisez le support RoutePolicy / RoutePolicySupport.
- ThrottlingInflightRoutePolicy
- SimpleScheduledRoutePolicy / CronScheduledRoutePolicy
- créez votre propre RoutePolicy
▶ Utilisez l'API Camel (nous le verrons plus tard).
interface publique RoutePolicy {
void onInit (Route route);
annulation sur la route (itinéraire);
vide onStart (itinéraire);
void onStop (itinéraire);
vide onSuspend (itinéraire);
vide onResume (itinéraire);
void onExchangeBegin (Itinéraire, Exchange);
void onExchangeDone (Route Route, Exchange Exchange);
}
Comment démarrer / arrêter ou suspendre / reprendre les itinéraires au moment de l'exécution?
▶ Supposez l'exigence suivante pour votre itinéraire planifié:
- L'itinéraire doit être planifié à partir d'un planificateur externe via des messages de commande JMS.
RoutePolicypolicy = new MyCustomRoutePolicy («activemq: queue: command»);
à partir de ("seda: start"). routeId ("scheduleRoute")
.noAutoStartup ()
.routePolicy (politique)
…
.to ("mock: end");
Comment démarrer / arrêter ou suspendre / reprendre les itinéraires au moment de l'exécution?
Classe publique MyCustomRoutePolicyextends RoutePolicySupport {
private String endpointUrl;
public MyCustomRoutePolicy (String endpointUrl) {
this.endpointUrl = endpointUrl;
}
public void onInit (itinéraire final de l'itinéraire) {
CamelContextcamelContext = route.getRouteContext (). GetCamelContext ();
Noeud final Endpoint = camelContext.getEndpoint (endpointUrl);
endpoint.createConsumer (new Processor () {
processus d'annulation publique (Exchange Exchange) lève Exception {
Commande de chaîne = exchange.getIn (). GetBody (String.class);
if ("start" .equals (command)) {
startRoute (route);
} else if ("resume" .equals (command)) {
resumeRoute (route);
} else if ("stop" .equals (command)) {
stopRoute (route);
} else if ("suspendre" .equals (commande)) {
suspendRoute (route);
}
}).début();
}
}
Comment configurer les itinéraires au moment de l'exécution?
▶ Camel dispose d'une API Java qui vous permet d'ajouter / de modifier / de supprimer des itinéraires au moment de l'exécution
- context.addRoutes (routeBuilderInstance)
- context.getRoute («routeId»)
- context.removeRoute («routeId»)
▶ Et comme mentionné précédemment aussi pour démarrer / arrêter et reprendre / suspendre les itinéraires
- context.startRoute («routeId»)
- context.stopRoute («routeId»)
- context.resumeRoute («routeId»)
- context.suspendRoute («routeId»);
Comment configurer les itinéraires au moment de l'exécution?
Échantillon 1
▶ Modification des ordinateurs d'extrémité à l'exécution:
from ("cxf: bean: ORDER_ENTRY"). routeId ("orderEntrySOAP")
…
.setHeader (“ENQUEUE_TIME”, System.currentTimeMillies ())
.to (“seda: ORDER_ENTRY”)
from ("seda: ORDER_ENTRY"). routeId ("orderEntry")
.setHeader (“DEQUEUE_TIME”, System.currentTimeMillies ())
.to (“bean: orderEntryService? method = timeConsumingProcessing”)
.to (“bean: performanceMonitor? method = adjustConcurrentConsumers”);
Échantillon 1
Classe publique PerformanceMonitor {
public void adjustConcurrentConsumers (Exchange Exchange) lève Exception {
long enqueueTime = exchange.getIn (). getHeader (”ENQUEUE_TIME”, Long.class);
long dequeueTime = exchange.getIn (). getHeader (”DEQUEUE_TIME”, Long.class);
if ((dequeueTime – enqueueTime)> 5000) {
CamelContext context = exchange.getContext ();
// seulement arrêter / démarrer le consommateur ne fonctionne pas (encore)
context.stopRoute ("orderEntry");
Route orderEntryRoute = context.getRoute ("orderEntry");
SedaEndpointendpoint = (SedaEndpoint) orderEntryRoute.getEndpoint ();
int consumerCount = endpoint.getConcurrentConsumers ();
endpoint.setConcurrentConsumers (consumerCount * 2);
context.startRoute ("orderEntry");
}
}
}
Comment configurer les itinéraires au moment de l'exécution?
Échantillon 2
▶ Route de traitement dédiée par client de manière statique:
from ("activemq: queue: ORDER_ENTRY"). routeId ("orderEntry")
.routingSlip (simple (“activemq: queue: ORDER_ENTRY. $ {header.COMPANY}”))
.fin();
from ("activemq: queue: ORDER_ENTRY.BANK1"). routeId ("orderEntryBank1")
.to (“bean: orderEntryService? method = timeConsumingProcessing”)
…
.fin();
…
from ("activemq: queue: ORDER_ENTRY.BANK9"). routeId ("orderEntryBank9")
.to (“bean: orderEntryService? method = timeConsumingProcessing”)
…
.fin();
Comment configurer les itinéraires au moment de l'exécution?
Échantillon 2
▶ Parcours de traitement dédié par client de manière dynamique:
from ("activemq: queue: ORDER_ENTRY"). routeId ("orderEntry")
.process (new DynamicRouteBuilderProcessor ())
.routingSlip (simple (“activemq: queue: ORDER_ENTRY. $ {header.COMPANY}”))
.fin();
Classe publique DynamicProcessorimplements Processor {
processus d'annulation publique (échange final) lance une exception {
final String company = exchange.getIn (). getHeader (“COMPANY”, String.class);
Route route = exchange.getContext (). GetRoute ("orderEntry" + société);
if (route == null) {
exchange.getContext (). addRoutes (new RouteBuilder () {
public void configure () lève Exception {
from ("activemq: queue: ORDER_ENTRY." + société) .routeId ("orderEntry" + société)
.to ("bean: orderEntryService? method = timeConsumingProcessing")
}
});
}
}
}
Comment les transactions fonctionnent-elles chez Camel?
▶ Tous les composants Camel ne sont PAS sensibles aux transactions!
▶ Les composants qui supportent les transactions sont:
- composant SQL
- Ibatis / MyBatiscomponent
- composant JPA
- Composant Hibernate
- composant JMS
- composant ActiveMQ
- Composant SJMS (Camel 2.11.0)
▶ Composants qui imitent le comportement de transaction:
- composant de fichier
- Composant FTP / SFTP / FTPS
- autres…
Comment les transactions fonctionnent-elles chez Camel?
▶ La prise en charge des transactions sur les chameaux s'appuie sur Springs PlatformTransactionManager
interface
- DataSourceTransactionManager
- JmsTransactionManager
- JpaTransactionManager / HibernateTransactionManager
- JtaTransactionManager
- et d'autres …
▶ Important: une transaction est associée à un seul thread d'exécution!
- Si vous utilisez “seda”, “vm”, “jms” ou tout autre protocole de votre sous-route qui traitera l'échange dans un autre thread, cette exécution ne fera pas partie de ce contexte de transaction!
▶ Une transaction n'est PAS associée à l'échange lui-même!
- Nous voulons prendre en charge les transactions asynchrones dans Camel 3.0.0.
▶ Consommer plusieurs échanges en une seule transaction n'est pas encore pris en charge.
- Le composant SJMS le supporte (Camel 2.11.0)
Comment les transactions fonctionnent-elles chez Camel?
▶ Votre système nécessite-t-il des transactions?
- Utilisez-vous des composants qui prennent en charge les transactions?
- Mettez-vous à jour le contenu (l’accès en lecture seule ne nécessite pas d’émission)?
- Mettez-vous à jour le contenu de la base de données à plusieurs endroits différents?
- Vous lisez / écrivez à partir de / dans plusieurs destinations JMS?
▶ Votre système nécessite-t-il des transactions XA?
- Vous avez accès à plus d’un composant transactionnel et les compensations ne fonctionnent pas pour vous?
▶ Que sont les compensations?
- En utilisant un TX normal et un «traitement» des erreurs (par exemple, des messages en double).
- Écrivez aux consommateurs idempotents (qui peuvent gérer les doublons).
- Exemple: File d'attente -> Mise à jour de la base de données -> File d'attente
Comment les transactions fonctionnent-elles chez Camel?
▶ Essayez d'éviter XA, parce que
- il est plus complexe à installer et facile à faire mal
- C’est plus cher et difficile à tester
- notre test unitaire “uniquement” teste les exceptions métier
- vous devez également tester les exceptions techniques pour vous assurer que cela fonctionnera
- Vous pouvez obtenir des résultats différents dans des environnements différents (système d'exploitation, disque, gestionnaire TX,…).
- vous devrez peut-être activer cette fonctionnalité explicitement (comme dans Oracle)
- il est plus lent (selon le fournisseur)
- votre ressource peut ne pas le supporter (comme HSQLDB)
- et ce n’est pas non plus pare-balles…
Comment configurer les transactions dans Camel?
▶ Un exemple typique avec JMS TX:
- Démarrer un envoi de messagerie
- Consommer un message d'une file d'attente
- Exécuter une logique métier
- Écrivez le message dans une autre file d'attente
- Commettez le TX de messagerie
Comment configurer les transactions dans Camel?
public void configure () lève Exception {
à partir de (“activemqTx: queue: transaction.incoming”)
.transacte («REQUIS»)
.to (“bean: businessService? method = computeOffer”)
.to (“activemqTx: queue: transaction.outgoing”);
}
Comment configurer les transactions dans Camel?
<bean id = ”txMgr" class = "org.springframework.jms.connection.JmsTransactionManager">
<nom de la propriété = "connectionFactory" ref = "connectionFactory" />
</ bean>
<id de bean = "REQUIRED" class = "org.apache.camel.spring.spi.SpringTransactionPolicy">
<nom de la propriété = "transactionManager" ref = "txMgr" />
<property name = "propagationBehaviorName" value = "PROPAGATION_REQUIRED" />
</ bean>
<bean id = ”connectionFactory" class = "org.apache.activemq.pool.PooledConnectionFactory">
<nom de la propriété = "maxConnections" value = "8" />
<nom de la propriété = "connectionFactory">
<bean class = "org.apache.activemq.ActiveMQConnectionFactory”>>
<property name = "brokerURL" value = "tcp: // localhost: 61616" />
</ bean>
</ property>
</ bean>
<bean id = "activemqTx" class = "org.apache.activemq.camel.component.ActiveMQComponent">
<nom de la propriété = "connectionFactory" ref = "connectionFactory" />
<nom de la propriété = "transactionné" value = "true" />
<nom de la propriété = "transactionManager" ref = "txMgr" />
</ bean>