Spring AMQP – Sender und Empfangen von Nachrichten

Stehe ich vor einem Problem, bei Empfang einer Nachricht von RabbitMQ.
Ich sende eine Nachricht wie unten

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);

Wenn wir sehen, RabbitMQ, dann bekommen wir einen vollqualifizierten Typ.

In der aktuellen Szenario, wir haben n Anzahl der Hersteller für die gleichen Verbraucher. Wenn ich ein mapper führt es zu einer Ausnahme.
Wie kann ich eine Nachricht senden, so dass es nicht enthalten type_id und ich kann die Meldung als Message-Objekt an und später kann ich binden Sie es auf meine benutzerdefinierte Objekt in den Empfänger.

Ich erhalte die Meldung unten.
Könnten Sie bitte lassen Sie mich wissen, wie Jackson2MessageConverter so, die Nachricht wird direkt an sich bindet mein Objekt/HashMap vom Empfänger Ende. Auch ich habe entfernt die Type_ID nun vom sender.

Wie Nachricht sieht in RabbitMQ

Priorität: 0 delivery_mode: 2-Header:

ContentTypeId: java.lang.Objekt
KeyTypeId: java.lang.Objekt content_encoding: UTF-8 content_type: application/json
{„Execution_start_time“:1473747183636,“status“:“SUCCESS“}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}

Verbindung

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }

Ausnahme

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)

Ich will nicht __TYPE__ID vom sender, weil Sie mehrere Absender für die gleichen Warteschlange und nur ein Verbraucher.

  • es führt zu einer exception ist nicht genug Informationen. Fügen Sie den stacktrace bitte
  • Eigentlich Header in rabbitmq enthält eine Eigenschaft namens type_id_. Dies sollte nicht sein. Wie, um eine Nachricht zu senden, in dem type_id_ Eigenschaft ist nicht vorhanden priority: 0 delivery_mode: 2 __TypeId__: com.diff.approach.JobListenerDTO** content_encoding: UTF-8 content_type: application/json
InformationsquelleAutor BIndu_Madhav | 2016-09-12



2 Replies
  1. 4

    führt es zu einer Ausnahme

    Was Ausnahme?

    TypeId: com.diff.Ansatz.JobListenerDTO

    Dass bedeutet, dass Sie senden ein DTO, nicht eine hash-map, wie Sie beschreiben, in Frage stellen.

    Wenn Sie möchten, entfernen Sie den typeId-header verwenden, können Sie eine Nachricht post Prozessor…

    rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
        m.getMessageProperties.getHeaders().remove("__TypeId__");
        return m;
    });
    

    (oder , new MessagePostProcessor() {...} wenn du nicht mit Java 8).

    BEARBEITEN

    Welche version von Spring AMQP verwenden Sie? Mit 1,6-Sie haben nicht einmal zu entfernen, die __TypeId__ header – Rahmen sieht auf den listener-parameter Typ und erzählt die Jackson-Konverter, der Typ, so wird es automatisch konvertiert, dass (wenn es kann). Wie Sie hier sehen können; es funktioniert gut, ohne zu entfernen die Typ-id…

    package com.example;
    
    import java.util.HashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class So39443850Application {
    
        private static final String QUEUE = "so39443850";
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
            context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
            context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
            context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
            context.close();
        }
    
        private final CountDownLatch latch = new CountDownLatch(1);
    
        @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
        public void listen(HashMap<String, Object> message) {
            System.out.println(message.getClass() + ":" + message);
            latch.countDown();
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
        @Bean
        public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
        public static class DTO {
    
            private String foo;
    
            private String baz;
    
            public DTO(String foo, String baz) {
                this.foo = foo;
                this.baz = baz;
            }
    
            public String getFoo() {
                return this.foo;
            }
    
            public void setFoo(String foo) {
                this.foo = foo;
            }
    
            public String getBaz() {
                return this.baz;
            }
    
            public void setBaz(String baz) {
                this.baz = baz;
            }
    
        }
    
    }
    

    Ergebnis:

    class java.util.HashMap:{foo=baz, baz=qux}
    

    Dies ist beschrieben in die Dokumentation

    In Versionen vor 1.6, die Typ-Informationen zum konvertieren der JSON-werden im message-Header oder eine benutzerdefinierte ClassMapper erforderlich war. Beginnend mit der version 1.6, wenn es gibt keine Typ-information-Header, der Typ kann abgeleitet werden aus der Ziel-Methode-Argumente.

    Können Sie auch so konfigurieren, dass eine benutzerdefinierte ClassMapper immer zurück HashMap.

    • Dank Gary, noch Eine Frage, Wie man diese Meldung erhalten ? headers: __ContentTypeId__: java.lang.Object __KeyTypeId__: java.lang.Object content_encoding: UTF-8 content_type: application/json
    • Ich erhalte die Meldung wie unten @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE) public void handleAdapterQueueMessage(Message message){ byte[] body = message.getBody(); } Wie konvertieren von byte [], um die hashmap zurück ??
    • Nicht platzieren Sie code in den Kommentaren – es ist unleserlich – Bearbeiten Sie Ihre Frage statt. Sie brauchen eine Jackson2JsonMessageConverter im listener-container Fabrik.
    • Hallo Gary, ich habe meine Empfänger-code, könnten Sie bitte empfehlen, wie zu verwenden Jackson2JsonMessageConverter
    • Welche version verwenden Sie? Siehe mein edit.
    • Version von Spring AMQP – 1.6.1.RELEASE Version von Spring-RabbitMQ – 1.5.6.RELEASE
    • Warum verwenden Sie nicht übereinstimmende Versionen? Das wird nicht funktionieren; Sie müssen die gleiche version aufweisen. Wenn Sie mit maven oder gradle, brauchen Sie nur zu erklären spring-rabbit und der entsprechenden version von spring-amqp wird automatisch eingezogen (transitiv). Unabhängig davon, sollten Sie immer die gleichen Versionen für beide Gläser; die aktuelle version ist 1.6.2.RELEASE – siehe die Projektseite.
    • Ja, Gary. Habe ich die gleichen Versionen für beide jetzt. Es arbeitete. Danke!

  2. 0
    • Möchten, verwenden Sie „a“ verschiedene Java-calss, wenn Sie eine Nachricht erhalten?

      Config @Bean Jackson2JsonMessageConverter mit einem benutzerdefinierten ClassMapper

    • Nutzen wollen „viele“ verschiedene Java-calss, wenn Sie eine Nachricht erhalten? wie :

      @MyAmqpMsgListener
      void handlerMsg(
              //Main message class, by MessageConverter
              @Payload MyMsg myMsg, 
      
              //Secondary message class - by MessageConverter->ConversionService
              @Payload Map<String, String> map,
      
              org.springframework.messaging.Message<MyMsg> msg,
              org.springframework.amqp.core.Message amqpMsg
      ) {
          //...
      }
      

      Geben Sie einen benutzerdefinierten @Bean Converter, ConversionServiceRabbitListenerAnnotationBeanPostProcessor :

      @Bean
      FormattingConversionServiceFactoryBean rabbitMqCs(
              Set<Converter> converters
      ) {
          FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean();
          fac.setConverters(converters);
          return fac;
      }
      @Bean
      DefaultMessageHandlerMethodFactory messageHandlerMethodFactory(
              @Qualifier("rabbitMqCs")
              FormattingConversionService rabbitMqCs
      ) {
          DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
          defaultFactory.setConversionService(rabbitMqCs);
          return defaultFactory;
      }
      
      //copied from RabbitBootstrapConfiguration
      @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
      @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
      public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor(
              MessageHandlerMethodFactory handlerFac
      ) {
          RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor();
          bpp.setMessageHandlerMethodFactory(handlerFac);
          return bpp;
      }
      
      @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
      public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
          return new RabbitListenerEndpointRegistry();
      }
      

    Referenzen:

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.