使用线程执行框架的一次过程

场景

一个线程从某个地方接收消息(数据),可以是其他主机或者消息队列,然后转由另外的一个线程池来执行具体处理消息的逻辑,并且消息的处理速度小于接收消息的速度。这种情景很常见,试想一下,你会怎么设计和实现?

直观想法

很显然采用JUC的线程框架,可以迅速写出代码。

消息接收者:


  1. public class Receiver { 
  2.     private static volatile boolean inited = false
  3.     private static volatile boolean shutdown = false
  4.     private static volatile int cnt = 0
  5.  
  6.     private MessageHandler messageHandler; 
  7.  
  8.     public void start(){ 
  9.         Executors.newSingleThreadExecutor().execute(new Runnable() { 
  10.             @Override 
  11.             public void run() { 
  12.                 while(!shutdown){ 
  13.                     init(); 
  14.                     recv(); 
  15.                 } 
  16.             } 
  17.         }); 
  18.     } 
  19.  
  20.     /** 
  21.      * 模拟消息接收 
  22.      */ 
  23.     public void recv(){ 
  24.             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); 
  25.     } 

消息处理:


  1. public class MessageHandler { 
  2.  
  3.     private static final int THREAD_POOL_SIZE = 4
  4.  
  5.     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); 
  6.  
  7.     public void handle(Message msg) { 
  8.         try { 
  9.             service.execute(new Runnable() { 
  10.                 @Override 
  11.                 public void run() { 
  12.                     parseMsg(msg); 
  13.                 } 
  14.             }); 
  15.         } catch (Throwable e) { 
  16.             System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) { 
  17.                 e.printStackTrace(); 
  18.             } 
  19.  
  20.         } 
  21.     } 

效果:这种方案导致的现象是接收到的消息会迅速堆积,我们从消息队列(或者其他地方)取出了大量消息,但是处理线程的速度又跟不上,所以导致的问题是大量的Task会堆积在线程池底层维护的一个阻塞队列中,这会极大的耗费存储空间,影响系统的性能。

分析:当execute()一个任务的时候,如果有空闲的worker线程,那么投入运行,否则看设置的***线程个数,没有达到线程个数限制就创建新线程,接新任务,否则就把任务缓冲到一个阻塞队列中,问题就是这个队列,默认的大小是没有限制的,所以就会大量的堆积任务,必然耗费heap空间。

【声明】:芜湖站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

相关文章