1. 程式人生 > >學習筆記- 自己寫的關於生產者與消費者模式,還有定時任務的demo。

學習筆記- 自己寫的關於生產者與消費者模式,還有定時任務的demo。

為了加深對生產者消費者模式的理解,特意寫了這個demo,裡面還包含了一個自己寫的定時任務。程式碼下載地址:http://download.csdn.net/detail/li_yan_fei/9811572

是個maven專案,只用了spring框架。

學到的內容有3個

第一:加深了對生產者消費者模式的理解

第二:java Object 的wait() timeout數值如果等於0,則會造成執行緒一直等待下去,除非被notify喚醒

第三:java中main函式主執行緒死掉不會影響其他執行緒的正常執行(除了守護執行緒)。因為main函式主執行緒和其他非守護執行緒是一樣的。

程式碼:

1.一個實現了按時間排序的佇列

package com.lyf.task;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;

import com.lyf.bean.TaskInfo;

/**
 * @ClassName: TaskStack
 * @Description: 實現了一個按照時間順序排列的佇列
 * @author yanfei.li
 * @date 2017年4月10日 下午1:53:49
 * @Company:
 */
public class TaskStack {

	private List<TaskInfo> queue = new ArrayList<TaskInfo>();

	public TaskStack() {
	}

	/**
	 * 功能描述: 往佇列裡塞任務
	 * @Title: push
	 * @author yanfei.li
	 * @date 2017年4月11日 上午11:41:50 
	 * @param taskInfo 
	 * @return void    
	 * @throws
	 */
	public synchronized void push(TaskInfo taskInfo) {

		// 如果是空佇列,直接將任務放進去就可以了
		if (this.queue.isEmpty()) {
			this.queue.add(taskInfo);
			// 喚醒正在呼叫pop的消費者執行緒
			this.notify();
			return;
		}
		// 如果佇列不是空的,就要比較執行時間了,根據執行時間排序
		for (int index = 0; index < this.queue.size(); index++) {
			TaskInfo info = this.queue.get(index);
			if (info.getRunTime() > taskInfo.getRunTime()) {
				this.queue.add(index, info);
				this.notify();
				return;
			}
		}
	}

	/**
	 * 功能描述: 從佇列裡取任務
	 * @Title: pop
	 * @author yanfei.li
	 * @date 2017年4月11日 上午11:41:28 
	 * @return TaskInfo    
	 * @throws
	 */
	public synchronized TaskInfo pop() {

		// 如果佇列裡沒有,就釋放鎖,等待喚醒
		if (this.queue.isEmpty()) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		while (true) {
			TaskInfo taskInfo = this.queue.get(0);
			Long now = System.currentTimeMillis();
			if (now >= taskInfo.getRunTime()) {// 如果取出的任務到了執行的時間了,就返回該任務並且從佇列中移除此任務。這裡一定要注意了 判斷條件一定要有=號,因為很可能出現相等的情況,如果進入了else中,就會造成wait(0).如果沒有notify就一直等下去了
				this.queue.remove(0);
				return taskInfo;
			} else {
				try {
					System.out.println("pop--------" + (taskInfo.getRunTime() - now));
					this.wait(taskInfo.getRunTime() - now);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 * 功能描述: 從佇列裡移除任務
	 * @Title: remove
	 * @author yanfei.li
	 * @date 2017年4月11日 上午11:41:06 
	 * @param taskInfo 
	 * @return void    
	 * @throws
	 */
	public synchronized void remove(TaskInfo taskInfo) {
		this.queue.remove(taskInfo);
	}

	/**
	 * 功能描述: 返回佇列裡所有的任務
	 * @Title: getAll
	 * @author yanfei.li
	 * @date 2017年4月11日 上午11:40:36 
	 * @return List<TaskInfo>    
	 * @throws
	 */
	public synchronized List<TaskInfo> getAll() {
		List<TaskInfo> retList = new ArrayList<TaskInfo>();
		for (TaskInfo taskInfo : this.queue) {
			try {
				TaskInfo retTask = new TaskInfo();
				BeanUtils.copyProperties(retTask, taskInfo);
				retList.add(retTask);
			} catch (IllegalAccessException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvocationTargetException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

		}
		return retList;
	}
}

2.消費者
package com.lyf.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lyf.bean.TaskInfo;
import com.lyf.util.SpringContextUtils;

/**
 * @ClassName: TaskExecutor 
 * @Description: 任務執行器,相當於消費者
 * @author yanfei.li 
 * @date 2017年4月10日 下午1:52:00 
 * @Company: 
 *
 */
public class TaskExecutor implements Runnable {

	private TaskStack taskStack = null;//任務佇列
	private ExecutorService fixedThreadPool = null;//執行緒執行池,選擇的固定執行緒池大小,由sping初始化
	
	public TaskExecutor(TaskStack taskStack,int poolSize) {
		this.taskStack = taskStack;
		this.fixedThreadPool = Executors.newFixedThreadPool(poolSize);
	}
	
	public void run() {
		
		while(true){
			try {
				//獲取當前要執行的任務
				TaskInfo taskInfo = taskStack.pop();
				//解析taskinfo資訊,獲取真正執行的task
				TaskInterface instance = (TaskInterface)SpringContextUtils.getBean(taskInfo.getApi());
				//將任務資訊,傳遞給將要執行的task
				instance.setTaskInfo(taskInfo);
				//將任務放到執行緒池中執行
				this.fixedThreadPool.submit(instance);
				//如果是週期性的任務,則計算出下次執行時間,然後放到佇列中
				if(instance.hasNext()){
					this.taskStack.push(instance.next());
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

}

3.任務bean
package com.lyf.bean;

import java.io.Serializable;

/**
 * @ClassName: TaskInfo
 * @Description: 任務實體
 * @author yanfei.li
 * @date 2017年4月10日 下午1:54:58
 * @Company:
 *
 */
public class TaskInfo implements Serializable {

	private static final long serialVersionUID = 8609311967819063807L;

	private String id;// 任務id

	private String type;// 任務型別

	private String runAt;// 執行時間規則

	private String cron;// cron表示式

	private long runTime;// 執行時間

	private String api;// 執行介面

	private Object[] params;// 任務引數

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public String getRunAt() {
		return runAt;
	}

	public void setRunAt(String runAt) {
		this.runAt = runAt;
	}

	public String getCron() {
		return cron;
	}

	public void setCron(String cron) {
		this.cron = cron;
	}

	public long getRunTime() {
		return runTime;
	}

	public void setRunTime(long runTime) {
		this.runTime = runTime;
	}

	public String getApi() {
		return api;
	}

	public void setApi(String api) {
		this.api = api;
	}

	public Object[] getParams() {
		return params;
	}

	public void setParams(Object[] params) {
		this.params = params;
	}

}

4.任務介面
package com.lyf.task;

import com.lyf.bean.TaskInfo;

/**
 * @ClassName: TaskInterface 
 * @Description: TODO
 * @author yanfei.li 
 * @date 2017年4月11日 下午12:00:10 
 * @Company: 
 *
 */
public interface TaskInterface extends Runnable {

	/**
	 * 功能描述: 設定任務
	 * @Title: setTaskInfo
	 * @author yanfei.li
	 * @date 2017年4月11日 下午12:03:28 
	 * @param taskInfo 
	 * @return void    
	 * @throws
	 */
	public void setTaskInfo(TaskInfo taskInfo);
	/**
	 * 功能描述: 判斷此任務是否還需要執行,針對的是迴圈性的任務
	 * @Title: hasNext
	 * @author yanfei.li
	 * @date 2017年4月11日 下午12:02:14 
	 * @return boolean    
	 * @throws
	 */
	public boolean hasNext();
	/**
	 * 功能描述: 返回下次要執行的任務
	 * @Title: next
	 * @author yanfei.li
	 * @date 2017年4月11日 下午12:02:18 
	 * @return TaskInfo    
	 * @throws
	 */
	public TaskInfo next();
}
5.實現了任務介面的任務抽象模板類

package com.lyf.task;

import com.lyf.bean.TaskInfo;
import com.lyf.util.SpringCronResolveUtil;

/**
 * @ClassName: TaskInstance 
 * @Description: 任務執行的模板類
 * @author yanfei.li 
 * @date 2017年4月11日 下午2:47:57 
 * @Company: 
 *
 */
public abstract class TaskInstance implements TaskInterface {

	protected TaskInfo taskInfo;
	
	
	public void run() {
		
		try {
			this.before();
			this.execute();
			this.after();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			//如果要記錄日誌什麼的,可以放到這裡
		}

	}
	
	/**
	 * 功能描述: 任務執行前執行的方法,比如做一些初始化工作
	 * @Title: before
	 * @author yanfei.li
	 * @date 2017年4月11日 下午2:50:43  
	 * @return void    
	 * @throws
	 */
	protected abstract void before() throws Exception;
	
	/**
	 * 功能描述: 任務執行的具體方法
	 * @Title: excute
	 * @author yanfei.li
	 * @date 2017年4月11日 下午2:51:25  
	 * @return void    
	 * @throws
	 */
	protected abstract void execute() throws Exception;
	
	/**
	 * 功能描述: 任務執行完後執行的方法
	 * @Title: after
	 * @author yanfei.li
	 * @date 2017年4月11日 下午2:51:41  
	 * @return void    
	 * @throws
	 */
	protected abstract void after() throws Exception;
	
	
	public void setTaskInfo(TaskInfo taskInfo) {
		this.taskInfo = taskInfo;
	}

	public boolean hasNext() {
		if (this.taskInfo != null && !this.taskInfo.getRunAt().equals("now")) {
			return true;
		}
		return false;
	}

	public TaskInfo next() {
		if (this.taskInfo != null) {
			this.taskInfo.setRunTime(
					SpringCronResolveUtil.nextExecutionTime(this.taskInfo.getCron(), this.taskInfo.getRunTime()));
			System.out.println("next===========" + (taskInfo.getRunTime() - System.currentTimeMillis()));
			return this.taskInfo;
		}
		return null;
	}

}

6.具體任務實現類
package com.lyf.task;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

@Service("MyTask")
@Scope("prototype")
public class MyTask extends TaskInstance {

	@Override
	protected void before() throws Exception {
		System.out.println("MyTask--------------before-------------");

	}

	@Override
	protected void execute() throws Exception {
		System.out.println("MyTask--------------execute-------------");

	}

	@Override
	protected void after() throws Exception {
		System.out.println("MyTask--------------after-------------");
	}

}

7.任務service
package com.lyf.task;

import com.lyf.bean.TaskInfo;

public interface TaskService {

	public void startTask(TaskInfo task);
}

8.實現類

package com.lyf.task;

import com.lyf.bean.TaskInfo;
import com.lyf.util.SpringCronResolveUtil;

public class TaskServiceImpl implements TaskService{

	private TaskStack taskStack = null;
	private TaskExecutor executor = null;
	
	public TaskServiceImpl(Integer poolSize) {
		this.taskStack = new TaskStack();
		if(poolSize != null){
			executor = new TaskExecutor(this.taskStack, poolSize);
			new Thread(executor).start();
		}
		this.init();
	}
	
	private void init(){
		//做一些其他的初始化工作
	}

	public void startTask(TaskInfo task) {
		
		if (task == null) {
			return;
		}
		
		//首次執行,設定runTime
		if (task.getRunAt().equals("now")) {
			task.setRunTime(System.currentTimeMillis() - 1);
		} else {
			if (task.getCron() == null || "".equals(task.getCron().trim())) {
				return;
			}
			task.setRunTime(SpringCronResolveUtil.nextExecutionTime(task.getCron()));
		}
		this.taskStack.push(task);
	}
	
}

9.從spring容器中獲取bean的工具類(在多執行緒環境下使用spring註解無法注入bean,所以需要手動獲取bean


package com.lyf.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class SpringContextUtils implements ApplicationContextAware {

	private static ApplicationContext applicationContext = null;
	/**
	 * 當繼承了ApplicationContextAware類之後,那麼程式在呼叫 
     * getBean(String)的時候會自動呼叫該方法,不用自己操作  
	 */
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		SpringContextUtils.applicationContext = applicationContext;
	}
	
	public static ApplicationContext getApplicationContext(){
		return SpringContextUtils.applicationContext;
	}
	/**
	 * 功能描述: 根據name從spring容器中返回bean
	 * @Title: getBean
	 * @author yanfei.li
	 * @date 2017年4月11日 下午1:47:17 
	 * @param name
	 * @throws BeansException 
	 * @return Object    
	 */
	public static Object getBean(String name)throws BeansException{
		return applicationContext.getBean(name);
	}
	/**
	 * 功能描述: 根據name和型別從spring容器中返回bean
	 * @Title: getBean
	 * @author yanfei.li
	 * @date 2017年4月11日 下午1:46:46 
	 * @param name
	 * @param requireType
	 * @throws BeansException 
	 * @return Object    
	 */
	public static <T> Object getBean(String name,Class<T> requireType)throws BeansException{
		return applicationContext.getBean(name,requireType);
	}
	
}

10.解析cron表示式的util(表示式只能是以空格分割的包含6個字元的字串,不然會報錯

package com.lyf.util;

import java.util.Date;

import org.springframework.scheduling.support.CronSequenceGenerator;

/**
 * @ClassName: SpringCronResolveUtil 
 * @Description: 解析cron字串的工具類
 * @author yanfei.li 
 * @date 2017年4月11日 下午2:59:44 
 * @Company: 
 *
 */
public class SpringCronResolveUtil {

	/**
	 * 功能描述: 根據當前時間計算並返回下次執行時間
	 * @Title: nextExecutionTime
	 * @author yanfei.li
	 * @date 2017年4月11日 下午3:00:56 
	 * @param cron 表示式字串,包含6個以空格分開的字元
	 * @return long    
	 * @throws
	 */
	public static long nextExecutionTime(String cron){
		CronSequenceGenerator cronSequenceGenerator = new CronSequenceGenerator(cron);
		Date lastTime = new Date();
		Date nexDate = cronSequenceGenerator.next(lastTime);
		
		return nexDate.getTime();
		
	}
	
	/**
	 * 功能描述: 根據最後一次執行時間計算並返回下次執行時間
	 * @Title: nextExecutionTime
	 * @author yanfei.li
	 * @date 2017年4月11日 下午3:00:23 
	 * @param cron 表示式字串,一定要是包含6個以空格分離的字元
	 * @param lastTime 最近的執行時間
	 * @return long    
	 * @throws
	 */
	public static long nextExecutionTime(String cron,long lastTime) {
		Date date = new Date(lastTime);
		CronSequenceGenerator cronSequenceGenerator = new CronSequenceGenerator(cron);
		Date nexDate = cronSequenceGenerator.next(date);
		return nexDate.getTime();
	}
	
	public static void main(String[] args) {
		String cron = "0/10 * * * * ? ";
		System.out.println("當前時間:" + new Date().getTime()); 
		System.out.println("下一次時間:" + nextExecutionTime(cron));
	}
}

11.測試

package com.lyf.producerandconsumer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.lyf.bean.TaskInfo;
import com.lyf.task.TaskService;
import com.lyf.util.SpringContextUtils;

/**
 * @ClassName: TestMain 
 * @Description: 測試
 * @author yanfei.li 
 * @date 2017年4月11日 下午6:20:30 
 * @Company: 
 *
 */
public class TestMain {

	public static void main(String[] args) throws InterruptedException {
		System.out.println("---------------");
		ApplicationContext ac = new ClassPathXmlApplicationContext("com/lyf/task/local-spring.xml");
		TaskInfo taskInfo = new TaskInfo();
		taskInfo.setApi("MyTask");
		taskInfo.setRunAt("some");
		taskInfo.setCron("0/10 * * * * ?");//每隔10秒執行一次
		taskInfo.setType("me");
		System.out.println("++++++++++++++++" + ac);
		TaskService taskService = (TaskService) SpringContextUtils.getBean("TaskService");
		taskService.startTask(taskInfo);
		
		System.out.println("===========" + taskService);
		
		//主執行緒的死掉,不會影響其他執行緒的繼續執行,除非是守護執行緒。

	}

}

12.pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.lyf</groupId>
  <artifactId>producerandconsumer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>producerandconsumer</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
		<dependency>
			<groupId>commons-beanutils</groupId>
			<artifactId>commons-beanutils</artifactId>
			<version>1.8.3</version>
		</dependency>
    		<!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-aop</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-beans -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>


		<!-- https://mvnrepository.com/artifact/org.springframework/spring-context -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework/spring-tx -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework/spring-expression -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-expression</artifactId>
			<version>4.0.4.RELEASE</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/aopalliance/aopalliance -->
		<dependency>
			<groupId>aopalliance</groupId>
			<artifactId>aopalliance</artifactId>
			<version>1.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjweaver -->
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjweaver</artifactId>
			<version>1.8.0</version>
		</dependency>
				<!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.1.3</version>
		</dependency>
  </dependencies>
</project>

13.spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
		http://www.springframework.org/schema/aop
                     http://www.springframework.org/schema/aop/spring-aop.xsd ">

	<!-- 啟動註解掃描 -->
	<context:annotation-config/>
	<!-- 指定掃描的路徑 -->
	<context:component-scan base-package="com.lyf.*" > 
		<!-- 不對controller的註解 做處理,過濾掉,是為了和springmvc整合時,防止重複掃描,造成bean初始化2次-->
		<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
	</context:component-scan>
    
    <bean id="springUtils" class="com.lyf.util.SpringContextUtils"/>
    
	<bean name="TaskService" class="com.lyf.task.TaskServiceImpl">
		<constructor-arg>
			<value>5</value>
		</constructor-arg>
	</bean>


</beans>

14.測試結果(每隔十秒執行一次,只貼出了部分列印資料

++++++++++++++++org[email protected]23e352bf: startup date [Wed Apr 12 11:44:46 GMT+08:00 2017]; root of context hierarchy
[email protected]
pop--------3621
MyTask--------------before-------------
MyTask--------------execute-------------
MyTask--------------after-------------
next===========9997
pop--------9997
MyTask--------------before-------------
MyTask--------------execute-------------
MyTask--------------after-------------
next===========9999
pop--------9999
MyTask--------------before-------------
MyTask--------------execute-------------
MyTask--------------after-------------



相關推薦

學習筆記- 自己的關於生產者消費者模式還有定時任務demo

為了加深對生產者消費者模式的理解,特意寫了這個demo,裡面還包含了一個自己寫的定時任務。程式碼下載地址:http://download.csdn.net/detail/li_yan_fei/9811572 是個maven專案,只用了spring框架。 學到的內容有3個

java生產者消費者模式

食物 會有 實現 條件 3.2 釋放 tac lee trace 前言: 生產者和消費者模式是我們在學習多線程中很經典的一個模式,它主要分為生產者和消費者,分別是兩個線程, 目錄 一:生產者和消費者模式簡介 二:生產者和消費者模式的實現 聲明:本例來源於java經典著作:《

Java並發編程(4)--生產者消費者模式介紹

定義 一點 ava empty 釋放 nal cond 場景 inter 一、前言   這種模式在生活是最常見的,那麽它的場景是什麽樣的呢? 下面是我假象的,假設有一個倉庫,倉庫有一個生產者和一個消費者,消費者過來消費的時候會檢測倉庫中是否有庫存,如果沒有了則等待生產,如

python 中生產者消費者 模式

生產者與消費者 模式 摘要: 最近一段時間,寫了一些生產者與消費者模型, 特此此文 作為總結. 文中總結了不同的 生產者與消費者的情況. 一個生產者, 多個消費者,與一個生產者,多個消費者 的程式設計模式. 一. 生產者與消費者 在軟體開發的過程中,經常碰

Java併發程式設計(4)--生產者消費者模式介紹

一、前言   這種模式在生活是最常見的,那麼它的場景是什麼樣的呢? 下面是我假象的,假設有一個倉庫,倉庫有一個生產者和一個消費者,消費者過來消費的時候會檢測倉庫中是否有庫存,如果沒有了則等待生產,如果有就先消費直至消費完成;而生產者每天的工作就是先檢測倉庫是否有庫存,如果沒有就開始生產,滿倉了就停止生產等待

程式碼編寫生產者消費者模式思路

##   **目前有個任務是建立大量的資料同時向kafka裡寫入,於是之前開了大量的執行緒建立資料並寫入,發現kafka並不能連線那麼多執行緒,後來就用到生產者與消費者模式,大量的執行緒生產資料放入佇列中,然後只開幾個執行緒從佇列中獲取並寫入kafka.**      

Java生產者消費者模式的簡單寫法

生產者消費者模式是研究多執行緒程式的經典問題之一,它描述是有一塊緩衝區作為快取佇列/倉庫,生產者可以將產品放入佇列,消費者則可以從佇列中取走產品。大多數的後臺服務程式併發控制的基本原理都可以歸納為生產者消費者模式。 1、使用Synchronized()、wait() 、 n

JAVA生產者消費者模式 BlockingQueueExecutorService例項

首先建立生產者: package ProducerAndConsumer; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Ti

實現生產者消費者模式

產品 style 等待 pytho ask 緩沖 制造 目錄 tex 實現生產者與消費者模式 目錄 生產者與消費者模式 實現 生產者與消費者模式 什麽是生產者消費者模式 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼

java多執行緒--ReentrantLock實現生產者消費者模式

一.本例實現 :一對一交替列印, 一.生產者邏輯 :每次只允許一個生產者來進行生產操作(生產者之間互斥訪問倉庫),必須等消費者取走資料之後,才能進行下一次的生產 二.消費者邏輯 :每次只允許一個消費者來進行生產操作(消費者之間互斥訪問倉庫),必須等生產者生產資料之後,才能

loadrunner學習筆記-場景計劃方式執行模式

手工場景-計劃方式scenario :多個指令碼之間按照設定的場景計劃來統一執行。group : 多個指令碼之間按照獨立設定模式跑,各個指令碼可以單獨設定虛擬使用者,執行時間等手工執行場景-執行模式real - world schedule (實際計劃)可以通過add  ac

生產者消費者模式(執行緒的同步互斥)

條件變數 條件變數的提出首先要涉及一個概念,就是生產者消費者模型: 生產者消費者,是在多執行緒同步的一個問題,兩個固定大小緩衝區的執行緒,在實際執行是會發生問題,生產者是生成資料放入緩衝區,重複過程,消費者在緩衝區取走資料。 生產者消費者的模型提出了三種關係,兩種角色,

多執行緒實現生產者消費者模式

生產者-消費者模式的簡介: 在實際的軟體開發過程中,我們將產生資料的模組稱為生產者,處理資料的模組成為消費者。但僅有這兩者還不夠成為一個生產者-消費者模式,還需要有一個緩衝區(一段記憶體區域)作為中介,生產者產生的資料放入緩衝區,消費者從緩衝區讀取資料並處理。

IOS多執行緒使用GCD訊號量實現生產者消費者模式

一、原理的簡述   在生產者消費者模式當中,首先需要分清在這個模式當中有哪些角色? 各角色分別擔任什麼職責與它們之間的關係如何? 角色之間是在保證資料的準確性的情況下如何通訊(同步資料)的? 假設現在有一個這樣的情形: 有兩個人共同訪問一個容量有限的倉庫,這2個人,

九、生產者消費者模式

# 生產者消費者模式 - 生產者消費者模式是程式設計中非常常見的一種設計模式,被廣泛運用在解耦、訊息佇列等場景。 - - 使用生產者消費者模式通常需要在兩者之間增加一個阻塞佇列作為媒介,有了媒介之後就相當於有了一個緩衝,平衡了兩者的能力。 - 整體如上圖所示,最上面是阻塞佇列,右側的 1 是生產者執行緒

自己的資料庫連結類請高手指點一下

using System; using System.Collections.Generic; using System.Text; using System.Data; using System.Data.OleDb; namespace DatabaseCo

Java學習(7):同步問題之生產者消費者的問題

con runnable pop pre 標記 this auth style about 生產者生產饅頭,消費者消費饅頭。一個籃子,生產者往籃子中放饅頭,消費者從籃子中取饅頭。 /** * 這是一個籃子類 * * @author xcx * @time 2017

廣播模式下的生產者消費者fanout模式

生成 ack word 需要 bin 隊列 highlight time host 生產者 #coding=utf-8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionP

RabbitMQ學習——生產者消費者入門例子

生產者 package com.learn.rabbitmqapi.message; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.cli

多執行緒學習生產者消費者

感悟:   生產者和消費者其實就是一個很簡單的一個多執行緒同步的例子,但是我卻花了好多天才能理解。反思下來就是缺乏實踐,這聽起來感覺很簡單的一個道理,但是能做到可就很難了。當初在看這個生產者和消費者例項的時候,看著感覺程式碼能簡單,大致上能理解,但是到了轉天自己卻又寫不出來了