ActiveMQ Dead Letter Queue with Spring Boot











up vote
0
down vote

favorite












I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output directory. All messages are successfully delivered to incoming but apparently they do not go to dead letter queue. How do I do that?



Here is my main class:



import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.io.File;
import java.util.Arrays;

@SpringBootApplication
public class DemoApplication {
public static void main(String args) {
SpringApplication.run(DemoApplication.class, args);
}

@Value("${activemq.broker-url}")
private String brokerUrl;

@Value("${activemq.user}")
private String userName;

@Value("${activemq.password}")
private String password;

@Bean
public BrokerService setupBroker() {

try {
brokerService.addConnector(brokerUrl);
brokerService.setUseJmx(true);
brokerService.setUseShutdownHook(false);
brokerService.setAdvisorySupport(true);
brokerService.setEnableStatistics(true);

brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
brokerService.setTempDataStore(createKahaDBTempDataStore());

brokerService.start();

Thread.sleep(6000);

brokerService.stop();

} catch (Exception e) {
e.printStackTrace();
}
return brokerService;
}

private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
return kahaDBPersistenceAdapter;
}

private PListStoreImpl createKahaDBTempDataStore() {
final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
return tempKahaDBStore;
}

@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setTrustedPackages(Arrays.asList("com.example.demo"));
return factory;
}
}


Here is my router:



@Component
public class MyRouter extends RouteBuilder {

private static Logger logger = LoggerFactory.getLogger(MyRouter.class);

@Override
public void configure() throws Exception {


onException(Exception.class)
.log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());

errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
.deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
.useOriginalMessage()
.backOffMultiplier(2)
.redeliveryDelay(100)
.maximumRedeliveries(3)
.log("TO DEAD LETTER QUEUE!"));

/*
* Receiving data to an internal queue "incoming",
* then calling rollback() to send the info from internal queue to dead Letter queue
* */

from("file:/JavaProjects/tmp/input?noop=true")
.to("activemq:queue:incoming");
from("activemq:queue:incoming")
.process(new MyProcessor())
.rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
.to("file:/JavaProjects/tmp/output");

//Getting data from Dead Letter Queue and put it to our output folder

from("activemq:queue:deadLetterQueue")
.process(new MyProcessor())
.to("file:/JavaProjects/tmp/output");
}
}


Here is my processor:



    public class MyProcessor implements Processor{

@Override
public void process(Exchange exchange) throws Exception {

System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");

}
}


Any help would be appreciated.










share|improve this question




























    up vote
    0
    down vote

    favorite












    I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output directory. All messages are successfully delivered to incoming but apparently they do not go to dead letter queue. How do I do that?



    Here is my main class:



    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
    import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;

    import java.io.File;
    import java.util.Arrays;

    @SpringBootApplication
    public class DemoApplication {
    public static void main(String args) {
    SpringApplication.run(DemoApplication.class, args);
    }

    @Value("${activemq.broker-url}")
    private String brokerUrl;

    @Value("${activemq.user}")
    private String userName;

    @Value("${activemq.password}")
    private String password;

    @Bean
    public BrokerService setupBroker() {

    try {
    brokerService.addConnector(brokerUrl);
    brokerService.setUseJmx(true);
    brokerService.setUseShutdownHook(false);
    brokerService.setAdvisorySupport(true);
    brokerService.setEnableStatistics(true);

    brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
    brokerService.setTempDataStore(createKahaDBTempDataStore());

    brokerService.start();

    Thread.sleep(6000);

    brokerService.stop();

    } catch (Exception e) {
    e.printStackTrace();
    }
    return brokerService;
    }

    private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
    final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
    kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
    kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
    kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
    return kahaDBPersistenceAdapter;
    }

    private PListStoreImpl createKahaDBTempDataStore() {
    final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
    tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
    return tempKahaDBStore;
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
    final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
    factory.setTrustedPackages(Arrays.asList("com.example.demo"));
    return factory;
    }
    }


    Here is my router:



    @Component
    public class MyRouter extends RouteBuilder {

    private static Logger logger = LoggerFactory.getLogger(MyRouter.class);

    @Override
    public void configure() throws Exception {


    onException(Exception.class)
    .log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());

    errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
    .deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
    .useOriginalMessage()
    .backOffMultiplier(2)
    .redeliveryDelay(100)
    .maximumRedeliveries(3)
    .log("TO DEAD LETTER QUEUE!"));

    /*
    * Receiving data to an internal queue "incoming",
    * then calling rollback() to send the info from internal queue to dead Letter queue
    * */

    from("file:/JavaProjects/tmp/input?noop=true")
    .to("activemq:queue:incoming");
    from("activemq:queue:incoming")
    .process(new MyProcessor())
    .rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
    .to("file:/JavaProjects/tmp/output");

    //Getting data from Dead Letter Queue and put it to our output folder

    from("activemq:queue:deadLetterQueue")
    .process(new MyProcessor())
    .to("file:/JavaProjects/tmp/output");
    }
    }


    Here is my processor:



        public class MyProcessor implements Processor{

    @Override
    public void process(Exchange exchange) throws Exception {

    System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
    System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");

    }
    }


    Any help would be appreciated.










    share|improve this question


























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output directory. All messages are successfully delivered to incoming but apparently they do not go to dead letter queue. How do I do that?



      Here is my main class:



      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.broker.BrokerService;
      import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
      import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.context.annotation.Bean;

      import java.io.File;
      import java.util.Arrays;

      @SpringBootApplication
      public class DemoApplication {
      public static void main(String args) {
      SpringApplication.run(DemoApplication.class, args);
      }

      @Value("${activemq.broker-url}")
      private String brokerUrl;

      @Value("${activemq.user}")
      private String userName;

      @Value("${activemq.password}")
      private String password;

      @Bean
      public BrokerService setupBroker() {

      try {
      brokerService.addConnector(brokerUrl);
      brokerService.setUseJmx(true);
      brokerService.setUseShutdownHook(false);
      brokerService.setAdvisorySupport(true);
      brokerService.setEnableStatistics(true);

      brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
      brokerService.setTempDataStore(createKahaDBTempDataStore());

      brokerService.start();

      Thread.sleep(6000);

      brokerService.stop();

      } catch (Exception e) {
      e.printStackTrace();
      }
      return brokerService;
      }

      private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
      final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
      kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
      kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
      kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
      return kahaDBPersistenceAdapter;
      }

      private PListStoreImpl createKahaDBTempDataStore() {
      final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
      tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
      return tempKahaDBStore;
      }

      @Bean
      public ActiveMQConnectionFactory activeMQConnectionFactory() {
      final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
      factory.setTrustedPackages(Arrays.asList("com.example.demo"));
      return factory;
      }
      }


      Here is my router:



      @Component
      public class MyRouter extends RouteBuilder {

      private static Logger logger = LoggerFactory.getLogger(MyRouter.class);

      @Override
      public void configure() throws Exception {


      onException(Exception.class)
      .log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());

      errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
      .deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
      .useOriginalMessage()
      .backOffMultiplier(2)
      .redeliveryDelay(100)
      .maximumRedeliveries(3)
      .log("TO DEAD LETTER QUEUE!"));

      /*
      * Receiving data to an internal queue "incoming",
      * then calling rollback() to send the info from internal queue to dead Letter queue
      * */

      from("file:/JavaProjects/tmp/input?noop=true")
      .to("activemq:queue:incoming");
      from("activemq:queue:incoming")
      .process(new MyProcessor())
      .rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
      .to("file:/JavaProjects/tmp/output");

      //Getting data from Dead Letter Queue and put it to our output folder

      from("activemq:queue:deadLetterQueue")
      .process(new MyProcessor())
      .to("file:/JavaProjects/tmp/output");
      }
      }


      Here is my processor:



          public class MyProcessor implements Processor{

      @Override
      public void process(Exchange exchange) throws Exception {

      System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
      System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");

      }
      }


      Any help would be appreciated.










      share|improve this question















      I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output directory. All messages are successfully delivered to incoming but apparently they do not go to dead letter queue. How do I do that?



      Here is my main class:



      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.broker.BrokerService;
      import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
      import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.context.annotation.Bean;

      import java.io.File;
      import java.util.Arrays;

      @SpringBootApplication
      public class DemoApplication {
      public static void main(String args) {
      SpringApplication.run(DemoApplication.class, args);
      }

      @Value("${activemq.broker-url}")
      private String brokerUrl;

      @Value("${activemq.user}")
      private String userName;

      @Value("${activemq.password}")
      private String password;

      @Bean
      public BrokerService setupBroker() {

      try {
      brokerService.addConnector(brokerUrl);
      brokerService.setUseJmx(true);
      brokerService.setUseShutdownHook(false);
      brokerService.setAdvisorySupport(true);
      brokerService.setEnableStatistics(true);

      brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
      brokerService.setTempDataStore(createKahaDBTempDataStore());

      brokerService.start();

      Thread.sleep(6000);

      brokerService.stop();

      } catch (Exception e) {
      e.printStackTrace();
      }
      return brokerService;
      }

      private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
      final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
      kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
      kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
      kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
      return kahaDBPersistenceAdapter;
      }

      private PListStoreImpl createKahaDBTempDataStore() {
      final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
      tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
      return tempKahaDBStore;
      }

      @Bean
      public ActiveMQConnectionFactory activeMQConnectionFactory() {
      final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
      factory.setTrustedPackages(Arrays.asList("com.example.demo"));
      return factory;
      }
      }


      Here is my router:



      @Component
      public class MyRouter extends RouteBuilder {

      private static Logger logger = LoggerFactory.getLogger(MyRouter.class);

      @Override
      public void configure() throws Exception {


      onException(Exception.class)
      .log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());

      errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
      .deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
      .useOriginalMessage()
      .backOffMultiplier(2)
      .redeliveryDelay(100)
      .maximumRedeliveries(3)
      .log("TO DEAD LETTER QUEUE!"));

      /*
      * Receiving data to an internal queue "incoming",
      * then calling rollback() to send the info from internal queue to dead Letter queue
      * */

      from("file:/JavaProjects/tmp/input?noop=true")
      .to("activemq:queue:incoming");
      from("activemq:queue:incoming")
      .process(new MyProcessor())
      .rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
      .to("file:/JavaProjects/tmp/output");

      //Getting data from Dead Letter Queue and put it to our output folder

      from("activemq:queue:deadLetterQueue")
      .process(new MyProcessor())
      .to("file:/JavaProjects/tmp/output");
      }
      }


      Here is my processor:



          public class MyProcessor implements Processor{

      @Override
      public void process(Exchange exchange) throws Exception {

      System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
      System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");

      }
      }


      Any help would be appreciated.







      java spring spring-boot activemq dead-letter






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 9 at 13:11

























      asked Nov 9 at 11:06









      Serhat

      388




      388





























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53224541%2factivemq-dead-letter-queue-with-spring-boot%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53224541%2factivemq-dead-letter-queue-with-spring-boot%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Hercules Kyvelos

          Tangent Lines Diagram Along Smooth Curve

          Yusuf al-Mu'taman ibn Hud