创建监听器
package com.ssyouxia.listener;/** * Created by lianfangfang on 2019/2/28. */import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import java.util.concurrent.atomic.AtomicInteger;public class RedisMessageListener implements MessageListener { private AtomicInteger count = new AtomicInteger( 0 ); @Override public void onMessage(Message message, byte[] pattern) { count.incrementAndGet(); } public int getCount() { return count.get(); }}
加载监听器
package com.ssyouxia.config;import com.ssyouxia.listener.RedisMessageListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Import;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;/** * Created by lianfangfang on 2019/2/28. */@Configuration@Import(SpringDataRedisConfig.class)public class RedisPubsubConfiguration { @Bean @Autowired public RedisMessageListenerContainer container( final JedisConnectionFactory connectionFactory) { final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listener(), new ChannelTopic("test-channel")); return container; } @Bean public MessageListener listener() { return new RedisMessageListener(); }}
测试类
package com.ssyouxia.config;import com.ssyouxia.listener.RedisMessageListener;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import java.util.concurrent.Callable;import static com.jayway.awaitility.Awaitility.await;import static java.util.concurrent.TimeUnit.SECONDS;import static org.hamcrest.CoreMatchers.equalTo;import static org.hamcrest.MatcherAssert.assertThat;/** * Created by lianfangfang on 2019/2/28. */@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(classes = RedisPubsubConfiguration.class)public class RedisPublishSubscriberTestCase { @Autowired private RedisTemplatetemplate; @Autowired private RedisMessageListener listener; @Test public void testPublishSubscribe() { assertThat(listener.getCount(), equalTo(0)); template.convertAndSend("test-channel", "Test Message 1!"); template.convertAndSend("test-channel", "Test Message 2!"); template.convertAndSend("test-channel", "Test Message 3!"); await().atMost(1, SECONDS).until( new Callable () { @Override public Integer call() throws Exception { return listener.getCount(); } }, equalTo(3) ); }}