什么是分布式队列任务队列

他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)2被浏览24分享邀请回答0添加评论分享收藏感谢收起分布式队列神器 Celery_Python开发者_传送门
分布式队列神器 Celery
Python开发者
(点击上方蓝字,快速关注我们)来源:rapospectresegmentfault.com/a/2050Celery 是什么?Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。Celery 专注于实时任务处理,支持任务调度。说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。1.快速入门(本文以 Celery4.0 为基础进行书写)首先,我们要理解 Celery 本身不是任务队列,它是管理分布式任务队列的工具,或者换一种说法,它封装好了操作常见任务队列的各种操作,我们用它可以快速进行任务队列的使用与管理,当然你也可以自己看 rabbitmq 等队列的文档然后自己实现相关操作都是没有问题的。Celery 是语言无关的,虽然它是用 Python 实现的,但他提供了其他常见语言的接口支持。只是如果你恰好使用 Python 进行开发那么使用 Celery 就自然而然了。想让 Celery 运行起来我们要明白几个概念:1.1 Brokersbrokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)常见的 brokers 有 rabbitmq、redis、Zookeeper 等1.2 Result Stores / backend顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了常见的 backend 有 redis、Memcached 甚至常用的数据都可以。1.3 Workers就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行1.4 Tasks就是我们想在队列中进行的任务咯,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。理解以上概念后我们就可以快速实现一个队列的操作:这里我们用 redis 当做 celery 的 broker 和 backend。(其他 brokers 与 backend 支持看这里(http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html))安装 Celery 和 redis 以及 python 的 redis 支持:apt-get install redis-serverpip install redispip install celery这里需要注意如果你的 celery 是 4.0 及以上版本请确保 python 的 redis 库版本在 2.10.4 及以上,否则会出现 redis 连接 timeout 的错误,具体参考(https://github.com/celery/celery/issues/3580)然后,我们需要写一个task:#tasks.pyfrom celery import Celery app = Celery('tasks',
backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #配置好celery的backend和broker @app.task
#普通函数装饰为 celery taskdef add(x, y):
return x + yOK,到这里,broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:celery -A tasks worker --loglevel=info意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态)最后一步,就是触发任务啦,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数:#trigger.pyfrom tasks import addresult = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用while not result.ready():
time.sleep(1)print 'task done: {0}'.format(result.get())运行此脚本delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。到此,一个简单的 celery 应用就完成啦。2. 进阶用法经过快速入门的学习后,我们已经能够使用 Celery 管理普通任务,但对于实际使用场景来说这是远远不够的,所以我们需要更深入的去了解 Celery 更多的使用方式。首先来看之前的task:@app.task
#普通函数装饰为 celery taskdef add(x, y):
return x + y这里的装饰器app.task实际上是将一个正常的函数修饰成了一个 celery task 对象,所以这里我们可以给修饰器加上参数来决定修饰后的 task 对象的一些属性。首先,我们可以让被修饰的函数成为 task 对象的绑定方法,这样就相当于被修饰的函数 add 成了 task 的实例方法,可以调用 self 获取当前 task 实例的很多状态及属性。其次,我们也可以自己复写 task 类然后让这个自定义 task 修饰函数 add ,来做一些自定义操作。2.1 根据任务状态执行不同操作任务执行后,根据任务状态执行不同操作需要我们复写 task 的 on_failure、on_success 等方法:# tasks.pyclass MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print 'task done: {0}'.format(retval)
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print 'task fail, reason: {0}'.format(exc)
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask)def add(x, y):
return x + y嗯, 然后继续运行 worker:celery -A tasks worker --loglevel=info运行脚本,得到:再修改下tasks:@app.task
#普通函数装饰为 celery taskdef add(x, y):
raise KeyError
return x + y重新运行 worker,再运行 trigger.py:可以看到,任务执行成功或失败后分别执行了我们自定义的 on_failure、on_success2.2 绑定任务为实例方法# tasks.pyfrom celery.utils.log import get_task_logger logger = get_task_logger(__name__)@app.task(bind=True)def add(self, x, y):
logger.info(self.request.__dict__)
return x + y然后重新运行:执行中的任务获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。关于 celery.task.request 对象的详细数据可以看这里(http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request-info)2.3 任务状态回调实际场景中得知任务状态是很常见的需求,对于 Celery 其内建任务状态有如下几种:当我们有个耗时时间较长的任务进行时一般我们想得知它的实时进度,这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度,具体实现:# tasks.pyfrom celery import Celeryimport time @app.task(bind=True)def test_mes(self):
for i in xrange(1, 11):
time.sleep(0.1)
self.update_state(state="PROGRESS", meta={'p': i*10})
return 'finish'然后在 trigger.py 中增加:# trigger.pyfrom task import add,test_mesimport sys def pm(body):
res = body.get('result')
if body.get('status') == 'PROGRESS':
sys.stdout.write('\r任务进度: {0}%'.format(res.get('p')))
sys.stdout.flush()
print '\r'
print resr = test_mes.delay()print r.get(on_message=pm, propagate=False)然后运行任务:2.4 定时/周期任务Celery 进行周期任务也很简单,只需要在配置中配置好周期任务,然后在运行一个周期任务触发器( beat )即可:新建 Celery 配置文件 celery_config.py:# celery_config.pyfrom datetime import timedeltafrom celery.schedules import crontab CELERYBEAT_SCHEDULE = {
'ptask': {
'task': 'tasks.period_task',
'schedule': timedelta(seconds=5),
},} CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'配置中 schedule 就是间隔执行的时间,这里可以用 datetime.timedelta 或者 crontab 甚至太阳系经纬度坐标进行间隔时间配置,具体可以参考这里(http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules)如果定时任务涉及到 datetime 需要在配置中加入时区信息,否则默认是以 utc 为准。例如中国可以加上:CELERY_TIMEZONE = 'Asia/Shanghai'然后在 tasks.py 中增加要被周期执行的任务:# tasks.pyapp = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')app.config_from_object('celery_config') @app.task(bind=True)def period_task(self):
print 'period task done: {0}'.format(self.request.id)然后重新运行 worker,接着再运行 beat:celery -A task beat可以看到周期任务运行正常~2.5 链式任务有些任务可能需由几个子任务组成,此时调用各个子任务的方式就变的很重要,尽量不要以同步阻塞的方式调用子任务,而是用异步回调的方式进行链式任务的调用:错误示范@app.taskdef update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info) @app.taskdef fetch_page(url):
return myhttplib.get(url) @app.taskdef parse_page(url, page):
return myparser.parse_document(page) @app.taskdef store_page_info(url, info):
return PageInfo.objects.create(url, info)正确示范1def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain() @app.task()def fetch_page(url):
return myhttplib.get(url) @app.task()def parse_page(page):
return myparser.parse_document(page) @app.task(ignore_result=True)def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)正确示范2fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。这里的 s() 是方法 celery.signature() 的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,个人感觉有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。2.6 调用任务前面讲了调用任务不能直接使用普通的调用方式,而是要用类似 add.delay(2, 2) 的方式调用,而链式任务中又用到了 apply_async 方法进行调用,实际上 delay 只是 apply_async 的快捷方式,二者作用相同,只是 apply_async 可以进行更多的任务属性设置,比如 callbacks/errbacks 正常回调与错误回调、执行超时、重试、重试时间等等,具体参数可以参考这里(http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async)2.7 关于 AsyncResultAsyncResult 主要用来储存任务执行信息与执行结果,有点类似 tornado 中的 Future 对象,都有储存异步结果与任务执行状态的功能,对于写 js 的朋友,它有点类似 Promise 对象,当然在 Celery 4.0 中已经支持了 promise 协议,只需要配合 gevent 一起使用就可以像写 js promise 一样写回调:import gevent.monkeymonkey.patch_all() import timefrom celery import Celery app = Celery(broker='amqp://', backend='rpc') @app.taskdef add(x, y):
return x + y def on_result_ready(result):
print('Received result for id %r: %r' % (result.id, result.result,)) add.delay(2, 2).then(on_result_ready)要注意的是这种 promise 写法现在只能用在 backend 是 RPC (amqp) 或 Redis 时。 并且独立使用时需要引入 gevent 的猴子补丁,可能会影响其他代码。 官方文档给的建议是这个特性结合异步框架使用更合适,例如 tornado、 twisted 等。delay 与 apply_async 生成的都是 AsyncResult 对象,此外我们还可以根据 task id 直接获取相关 task 的 AsyncResult: AsyncResult(task_id=xxx)关于 AsyncResult 更详细的内容,可以参考这里(http://docs.celeryproject.org/en/latest/reference/celery.result.html?highlight=result#celery.result.AsyncResult)利用 Celery 进行分布式队列管理、开发将会大幅提升开发效率,关于 Celery 更详细的使用大家可以去参考详细的官方文档(http://docs.celeryproject.org/en/latest/index.html)觉得本文对你有帮助?请分享给更多人关注「Python开发者」看更多技术干货
觉得不错,分享给更多人看到
Python开发者 微信二维码
分享这篇文章
2月2日 0:41
Python开发者 最新文章博客访问: 2341978
博文数量: 211
注册时间:
提供数据库专业技术服务:
专注Oracle/MySQL/sql server,擅长数据库性能处理,紧急救援,特殊恢复,Sql优化
ITPUB论坛APP
ITPUB论坛APP
APP发帖 享双倍积分
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: 开源技术 07:54:29
原文地址: 作者:
一 前言 &&
& &Redis Queue 一款轻量级的P分布式异步任务队列,基于Redis作为broker,将任务存到redis里面,然后在后台执行指定的Job。就目前而言有三套成熟的工具celery,huey ,rq 。按照功能和使用复杂度来排序的话也是 celery&huey&rq. 因为rq 简单,容易上手,所以自己做的系统也会使用RQ作为分布式任务调度系统。
& &因为RQ 依赖于Redis 故需要安装版本&= 2.6.0.具体安装方法请参考《Redis初探》。*nix 系统环境下安装RQ:
pip install rq
无需其他配置即可以使用RQ。
& &RQ 主要由三部分构成 Job ,Queues,Worker 构成。job也就是开发定义的函数用来实现具体的功能。调用RQ 把job 放入队列Queues,Worker 负责从redis里面获取任务并执行,根据具体情况返回函数的结果。
3.1&关于job
& &一个任务(job)就是一个Python对象,具体表现为在一个工作(后台)进程中异步调用一个函数。任何Python函数都可以异步调用,简单的将函数与参数追加到队列中,这叫做入队(enqueueing)。
3.2 关于Queue
& &将任务加入到队列之前需要初始化一个连接到指定Redis的Queue
q=Queue(connection=redis_conn)
rq_test import hello
result = q.enqueue(hello,'yangyi')
& &queue有如下属性:
& &timeout :指定任务最长执行时间,超过该值则被认为job丢失,对于备份任务 需要设置一个比较长的时间 比如24h。
& &result_ttl :存储任务返回值的有效时间,超过该值则失效。
& &ttl :specifies the maximum queued time of the job before it'll be cancelled
& &depends_on :specifies another job (or job id) that must complete before this job will be queued
& &job_id : allows you to manually specify this job's job_id
& &at_front :will place the job at the front of the queue, instead of the back
& &kwargs and args : lets you bypass the auto-pop of these arguments, ie: specify a timeout argument for the underlying job function.
& 需要关注的是 depends_on ,通过该属性可以做级联任务A--&B ,只有当A 执行成功之后才能执行B .
& 通过指定队列的名字,我们可以把任务加到一个指定的队列中:
q = Queue("low", connection = redis_conn)
q.enqueue(hello, "杨一")
&对于例子中的Queue("low"),具体使用的时候可以替换"low"为任意的复合业务逻辑名字,这样就可以根据业务的需要灵活地归类的任务了。一般会根据优先级给队列命名(如:high, medium, low).
&如果想要给enqueue传递参数的情况,可以使用enqueue_call方法。在要传递超时参数的情况下:
q = Queue("low", connection = redis_conn)
q.enqueue_call(func=hello, args= ("杨一",),timeout = 30)
3.3&关于worker
& &Workers将会从给定的队列中不停的循环读取任务,当所有任务都处理完毕就等待新的work到来。每一个worker在同一时间只处理一个任务。在worker中,是没有并发的。如果你需要并发处理任务,那就需要启动多个worker。
& &目前的worker实际上是fork一个子进程来执行具体的任务,也就是说rq不适合windows系统。而且RQ的work是单进程的,如果想要并发执行队列中的任务提高执行效率需要使用threading针对每个任务进行fork线程。
& &worker的生命周期有以下几个阶段组成:
& &1 启动,载入Python环境
& &2 注册,worker注册到系统上,让系统知晓它的存在。
& &3 开始监听。从给定的redis队列中取出一个任务。如果所有的队列都是空的且是以突发模式运行的,立即退出。否则,等待新的任务入队。
& &4 分配一个子进程。分配的这个子进程在故障安全的上下文中运行实际的任务(调用队列中的任务函数)
& &5 处理任务。处理实际的任务。
& &6 循环。重复执行步骤3。
四 如何使用
& &简单的开发一个deamon 函数,用于后端异步调用,注意任意函数都可以加入队列,必须能够在入队的时候 被程序访问到。
#!/usr/bin/env python
#-*- coding:utf-8 -*-
def hello(name):
&&&&print "hello ,%s"%name
&&&&ip='192.168.0.1'
&&&&num=1024
&&&&return name,ip,num
def workat(name):
&&&&print "hello %s ,you r workat youzan.com "%(name)
4.1 构建队列,将任务对象添加到队列里面
&&& from redis import Redis,ConnectionPool
&&& from rq import Queue
&&& pool = ConnectionPool(db=0, host='127.0.0.1', port=6379,
... password='yangyi')
&&& redis_conn = Redis(connection_pool=pool)
&&& q=Queue(connection=redis_conn)
&&& from rq_test import hello
&&& result = q.enqueue(hello,'yangyi')
&&& result = q.enqueue(hello,'youzan.com')
先实例化一个Queue类q,然后通过enqueue方法发布任务。第一个参数是执行的函数名,后面是函数执行所需的参数,可以是args也可以是kwargs,案例中是一个字符串。
然后会返回一个Job类的实例,后面会具体介绍Job类的实例具体的api。
4.2启动worker ,从日志上可以看到执行了utils.hello('yangyi') utils.hello('youzan.com') 。当然这个只是简单的调用介绍,生产环境还要写的更加健壮,针对函数执行的结果进行相应的业务逻辑处理。&
root@rac2:~# &python woker.py
23:44:48 RQ worker u'rq:worker:rac2.3354' started, version 0.6.0
23:44:48 Cleaning registries for queue: default
23:44:48 *** Listening on default...
23:44:48 default: utils.hello('yangyi') (63879f7c-b453-4405-a262-b9a6b6568b68)
hello ,yangyi
23:44:48 default: Job OK (63879f7c-b453-4405-a262-b9a6b6568b68)
23:44:48 Result is kept for 500 seconds
23:44:48 *** Listening on default...
23:45:12 default: utils.hello('youzan.com') (e4e9ed62-c476-45f2-b66a-4b)
hello ,youzan.com
23:45:12 default: Job OK (e4e9ed62-c476-45f2-b66a-4b)
23:45:12 Result is kept for 500 seconds
需要说明的是其实 worker的启动顺序应该在job放入队列之前,一直监听rq里面是否有具体的任务,当然如果worker晚于job 加入队列启动,job的状态会显示为 queued 状态。
4.3 查看作业执行的情况
当任务加入队列,queue.enqueue()方法返回一个job实例。其定义位于rq.job文件中,可以去查看一下它的API,主要用到的API有:
&&& from rq import job
&&& job = q.enqueue(hello,'youzan.com')
&&& job.get_id() ##获取任务的id ,如果没有指定 ,系统会自动分配一个随机的字符串。
u'17ad0b3a-195e-49d5-8d31-02837ccf5fa6'
&&& job = q.enqueue(hello,'youzan.com')
&&& print job.get_status() ##获取任务的处理状态
&&& step1=q.enqueue(workat,) ##故意不传递参数,让函数执行失败,则获取的状态值是 failed
&&& print step1.get_status()
&&& print job.result # 当任务没有执行的时候返回None,否则返回非空值,如果 函数 hello() 有return 的值,会赋值给result
当我们把worker 监听进程停止,然后重新发布任务,查看此时任务的在队列的状态,会显示为 queued
&&& job = q.enqueue(hello,'youzan')
&&& print job.get_status()
&&& print job.to_dict() #把job实例转化成一个字典,我们主要关注状态。
{u'origin': u'default', u'status': u'queued', u'description': u"rq_test.hello('youzan')", u'created_at': 'T08:00:40Z', u'enqueued_at': 'T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
&&& job.cancel() # 取消作业,尽管作业已经被执行,也可以取消
&&& print job.to_dict()
{u'origin': u'default', u'status': u'queued', u'description': u"rq_test.hello('youzan')", u'created_at': 'T08:00:40Z', u'enqueued_at': 'T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
&&& print job.get_status()
&&& job.delete() # 从redis队列中删除该作业
&&& print job.get_status()
&&& print job.to_dict()
{u'origin': u'default', u'description': u"rq_test.hello('youzan')", u'created_at': 'T08:00:40Z', u'enqueued_at': 'T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
五 参考文章
[4] &这位博主写了很多rq相关的实践经验,值得参考。
阅读(2001) | 评论(0) | 转发(0) |
相关热门文章
给主人留下些什么吧!~~
请登录后评论。分布式队列编程优化篇如果承担领导人角色的消费者,在执行execute()阶段得知自己将要下台,根据消息处理的原子性,该领导人可以决定是否提前终止操作。如果整个消息处理是一个原子性事务,直接终止该操作可以快速实现领导人换届。否则,前任领导必须完成当前消息处理后,才进入交接阶段。这意味着新的领导人,在inaugurate()阶段需要进行一定时间的等待。排重优化频次控制是一个经典问题。对于分布式队列编程架构,相同请求重复出现在队列的情况并不少见。如果相同请求在队列中重复太多,排重优化就显得很必要。分布式缓存更新是一个典型例子,所有请求都被发送到队列中用于缓存更新。如果请求符合典型的高斯分布,在一段时间内会出现大量重复的请求,而同时多线程更新同一请求缓存显然没有太大的意义。排重优化是一个算法,其本质是基于状态机的编程,整个讲解通过模型、构思和实施三个步骤完成。模型进行排重优化的前提是大量重复的请求。在模型这一小节,我们首先阐述重复度模型、以及不同重复度所导致的消费模型,最后基于这两个模型去讲解排重状态机。重复度模型首先我们给出最小重复长度的概念。同一请求最小重复长度:同一请求在队列中的重复出现的最小间距。例如,请求ri第一次出现在位置3,第二次出现在10,最小重复长度等于7。是否需要进行排重优化取决于队列中请求的重复度。由于不同请求之间并不存在重复的问题,不失一般性,这里的模型只考了单个请求的重复度,重复度分为三个类:无重复、稀疏重复、高重复。无重复:在整个请求过程,没有任何一个请求出现一次以上。稀疏重复:主要的请求最小重复长度大于消费队列长度。高重复:大量请求最小重复长度小于消费队列长度。对于不同的重复度,会有不同的消费模型。无重复消费模型在整个队列处理过程中,所有的请求都不相同,如下图:稀疏重复消费模型当同一请求最小重复长度大于消费者队列长度,如下图。假定有3个消费者,Consumer1将会处理r1,Consumer2将会处理r2,Consumer3将会处理r3,如果每个请求处理的时间严格相等,Consumer1在处理完r1之后,接着处理r4,Consumer2将会处理r2之后会处理r1。虽然r1被再次处理,但是任何时刻,只有这一个消费者在处理r1,不会出现多个消费者同时处理同一请求的场景。高重复消费模型如下图,仍然假定有3个消费者,队列中前面4个请求都是r1,它会同时被3个消费者线程处理:显然,对于无重复和稀疏重复的分布式队列,排重优化并不会带来额外的好处。排重优化所针对的对象是高重复消费模型,特别是对于并行处理消费者比较多的情况,重复处理同一请求,资源消耗极大。排重状态机排重优化的主要对象是高重复的队列,多个消费者线程或进程同时处理同一个幂等请求只会浪费计算资源并延迟其他待请求处理。所以,排重状态机的一个目标是处理唯一性,即:同一时刻,同一个请求只有一个消费者处理。如果消费者获取一条请求消息,但发现其他消费者正在处理该消息,则当前消费者应该处于等待状态。如果对同一请求,有一个消费者在处理,一个消费者在等待,而同一请求再次被消费者读取,再次等待则没有意义。所以,状态机的第二个目标是等待唯一性,即:同一时刻,同一个请求最多只有一个消费者处于等待状态。总上述,状态机的目标是:处理唯一性和等待唯一性。我们把正在处理的请求称为头部请求,正在等待的请求称为尾部请求。由于状态机的处理单元是请求,所以需要针对每一个请求建立一个排重状态机。基于以上要求,我们设计的排重状态机包含4个状态Init,Process,Block,Decline。各个状态之间转化过程如下图:状态机创建时处于Init状态。对Init状态进行Enqueue操作,即接收一个请求,开始处理(称为头部请求),状态机进入Process状态。状态机处于Process状态,表明当前有消费者正在处理头部请求。此时,如果进行Dequeue操作,即头部请求处理完成,返回Init状态。如果进行Enqueue操作,即另一个消费者准备处理同一个请求,状态机进入Block状态(该请求称为尾部请求)。状态机处于Block状态,表明头部请求正在处理,尾部请求处于阻塞状态。此时,进行Dequeue操作,即头部请求处理完成,返回Process状态,并且尾部请求变成头部请求,原尾部请求消费者结束阻塞状态,开始处理。进行Enqueue操作,表明一个新的消费者准备处理同一个请求,状态机进入Decline状态。状态机进入Decline状态,根据等待唯一性目标,处理最新请求的消费者将被抛弃该消息,状态机自动转换回Block状态。构思状态机描述的是针对单个请求操作所引起状态变化,排重优化需要解决队列中所有请求的排重问题,需要对所有请求的状态机进行管理。这里只考虑单虚拟机内部对所有请求状态机的管理,对于跨虚拟机的管理可以采用类似的方法。对于多状态机管理主要包含三个方面:一致性问题、完整性问题和请求缓存驱逐问题。一致性问题一致性在这里要求同一请求的不同消费者只会操作一个状态机。由于每个请求都产生一个状态机,系统将会包含大量的状态机。为了兼顾性能和一致性,我们采用ConcurrentHashMap保存所有的状态机。用ConcurrentHashMap而不是对整个状态机队列进行加锁,可以提高并行处理能力,使得系统可以同时操作不同状态机。为了避免处理同一请求的多消费者线程同时对ConcurrentHashMap进行插入所导致状态机不一致问题,我们利用了ConcurrentHashMap的putIfAbsent()方法。代码方案如下,key2Status用于存储所有的状态机。消费者在处理请求之前,从状态机队列中读取排重状态机TrafficAutomate。如果没有找到,则创建一个新的状态机,并通过putIfAbsent()方法插入到状态机队列中。private ConcurrentHashMap&T, TrafficAutomate& key2Status = new ConcurrentHashMap();
TrafficAutomate trafficAutomate = key2Status.get(key);
if(trafficAutomate == null)
trafficAutomate = new TrafficAutomate();
TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);
if(oldAutomate != null)
trafficAutomate = oldA
完整性问题完整性要求保障状态机Init,Process,Block,Decline四种状态正确、状态之间的转换也正确。由于状态机的操作非常轻量级,兼顾完整性和降低代码复杂度,我们对状态机的所有方法进行加锁。请求缓存驱逐问题(Cache Eviction)如果不同请求的数量太多,内存永久保存所有请求的状态机的内存开销太大。所以,某些状态机需要在恰当的时候被驱逐出内存。这里有两个思路:当状态机返回Init状态时,清除出队列。启动一个后台线程,定时扫描状态机队列,采用LRU等标准缓存清除机制。标识问题每个请求对应于一个状态机,不同的状态机采用不同的请求进行识别。对于同一状态机的不同消费者,在单虚拟机方案中,我们采用线程id进行标识。实施排重优化的主要功能都是通过排重状态机(TrafficAutomate)和状态机队列(QueueCoordinator)来实施的。排重状态机描述的是针对单个请求的排重问题,状态机队列解决所有请求状态机的排重问题。状态机实施(TrafficAutomate)根据状态机模型,其主要操作为enQueue和deQueue,其状态由头部请求和尾部请求的状态共同决定,所以需要定义两个变量为head和tail,用于表示头部请求和尾部请求。为了确保多线程操作下状态机的完整性(Integraty),所有的操作都将加上锁。enQueue操作当一个消费者执行enQueue操作时:如果此时尾部请求不为空,根据等待唯一性要求,返回DECLINE,当前消费者应该抛弃该请求;如果头部请求为空,返回ACCPET,当前消费者应该立刻处理该消息;否则,返回BLOCK,该消费者应该等待,并不停的查看状态机的状态,一直到头部请求处理完成。enQueue代码如下:synchronized ActionEnum enQueue(long id)
if(tail != INIT_QUEUE_ID)
return DECLINE;
if(head == INIT_QUEUE_ID)
return ACCEPT;
return BLOCK;
deQueue操作对于deQueue操作,首先将尾部请求赋值给头部请求,并将尾部请求置为无效。deQueue代码如下:synchronized boolean deQueue(long id)
tail = INIT_QUEUE_ID;
状态机队列实施(QueueCoordinator)接口定义状态机队列集中管理所有请求的排重状态机,所以其操作和单个状态机一样,即enQueue和deQueuqe接口。这两个接口的实现需要识别特定请求的状态机,所以它们的入参应该是请求。为了兼容不同类型的请求消息,我们采用了Java泛型编程。接口定义如下:public interface QueueCoordinator&T& {
public boolean enQueue(T key);
public void deQueue(T key);
enQueue操作enQueue操作过程如下:首先,根据传入的请求key值,获取状态机, 如果不存在则创建一个新的状态机,并保存在ConcurrentHashMap中。接下来,获取线程id作为该消费者的唯一标识,并对对应状态机进行enQueue操作。如果状态机返回值为ACCEPT或者DECLINE,返回业务层处理代码,ACCEPT意味着业务层需要处理该消息,DECLINE表示业务层可以抛弃当前消息。如果状态机返回值为Block,则该线程保持等待状态。异常处理。在某些情况下,头部请求线程可能由于异常,未能对状态机进行deQueue操作(作为组件提供方,不能假定所有的规范被使用方实施)。为了避免处于阻塞状态的消费者无期限地等待,建议对状态机设置安全超时时限。超过了一定时间后,状态机强制清空头部请求,返回到业务层,业务层开始处理该请求。代码如下:public boolean enQueue(T key) {
_loggingStastic();
TrafficAutomate trafficAutomate = key2Status.get(key);
if(trafficAutomate == null)
trafficAutomate = new TrafficAutomate();
TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);
if(oldAutomate != null)
trafficAutomate = oldA
long threadId = Thread.currentThread().getId();
ActionEnum action = trafficAutomate.enQueue(threadId);
if(action == DECLINE)
else if (action == ACCEPT)
//Blocking status means some other thread are working on this key, so just wait till timeout
long start = System.currentTimeMillis();
long span = 0;
_nonExceptionSleep(NAP_TIME_IN_MILL);
if(trafficAutomate.isHead(threadId))
span = System.currentTimeMillis() -
}while(span &= timeout);
//remove head so that it won't block the queue for too long
trafficAutomate.evictHeadByForce(threadId);
deQueue操作deQueue操作首先从ConcurrentHashMap获取改请求所对应的状态机,接着获取该线程的线程id,对状态机进行deQueue操作。enQueue代码如下:public void deQueue(T key) {
TrafficAutomate trafficAutomate = key2Status.get(key);
if(trafficAutomate == null)
logger.error("key {} doesn't exist ", key);
long threadId = Thread.currentThread().getId();
trafficAutomate.deQueue(threadId);
源代码完整源代码可以在获取。参考资料[1] Rabbit MQ, .[2] IBM Knowledge Center, .[3] Wikipedia, .[4] Hadoop, .[5] .[6] Lamport L, .134收藏分享举报{&debug&:false,&apiRoot&:&&,&paySDK&:&https:\u002F\u002Fpay.zhihu.com\u002Fapi\u002Fjs&,&wechatConfigAPI&:&\u002Fapi\u002Fwechat\u002Fjssdkconfig&,&name&:&production&,&instance&:&column&,&tokens&:{&X-XSRF-TOKEN&:null,&X-UDID&:null,&Authorization&:&oauth c3cef7c66aa9e6a1e3160e20&}}{&database&:{&Post&:{&&:{&isPending&:false,&contributes&:[],&title&:&分布式队列编程优化篇&,&author&:&mei-tuan-dian-ping-ji-shu-tuan-dui&,&content&:&\u003Ch1\u003E前言\u003C\u002Fh1\u003E\u003Cp\u003E“分布式队列编程”是一个系列文,之前我们已经发布了《分布式队列编程模型、实战》,主要剖析了分布式队列编程模型的需求来源、定义、结构以及其变化多样性;根据作者在新美大实际工作经验,给出了队列式编程在分布式环境下的一些具体应用。本文将重点阐述工程师运用分布式队列编程构架的时候,在生产者、分布式队列以及消费者这三个环节的注意点以及优化建议。\u003C\u002Fp\u003E\u003Cp\u003E确定采用分布式队列编程模型之后,主体架构就算完成了,但工程师的工作还远远未结束。天下事必做于细,细节是一个不错的架构向一个优秀的系统进阶的关键因素。优化篇选取了作者以及其同事在运用分布式队列编程模型架构时所碰到的典型问题和解决方案。这里些问题出现的频率较高,如果你经验不够,很可能会“踩坑”。希望通过这些讲解,帮助读者降低分布式队列编程模型的使用门槛。本文将对分布式队列编程模型的三种角色:生产者(Producer),分布式队列(Queue),消费者(Consumer)分别进行优化讨论。\u003C\u002Fp\u003E\u003Ch1\u003E生产者优化\u003C\u002Fh1\u003E\u003Cp\u003E在分布式队列编程中,生产者往往并非真正的生产源头,只是整个数据流中的一个节点,这种生产者的操作是处理-转发(Process-Forward)模式。\u003C\u002Fp\u003E\u003Cp\u003E这种模式给工程师们带来的第一个问题是吞吐量问题。这种模式下运行的生产者,一边接收上游的数据,一边将处理完的数据发送给下游。本质上,它是一个非常经典的数学问题,其抽象模型是一些没有盖子的水箱,每个水箱接收来自上一个水箱的水,进行处理之后,再将水发送到下一个水箱。工程师需要预测水源的流量、每个环节水箱的处理能力、水龙头的排水速度,最终目的是避免水溢出水箱,或者尽可能地减小溢出事件的概率。实际上流式编程框架以及其开发者花了大量的精力去处理和优化这个问题。下文的缓存优化和批量写入优化都是针对该问题的解决方案。\u003C\u002Fp\u003E\u003Cp\u003E第二个需要考虑的问题是持久化。由于各种原因,系统总是会宕机。如果信息比较敏感,例如计费信息、火车票订单信息等,工程师们需要考虑系统宕机所带来的损失,找到让损失最小化的解决方案。持久化优化重点解决这一类问题。\u003C\u002Fp\u003E\u003Ch2\u003E缓存优化\u003C\u002Fh2\u003E\u003Cp\u003E处于“处理-转发”模式下运行的生产者往往被设计成请求驱动型的服务,即每个请求都会触发一个处理线程,线程处理完后将结果写入分布式队列。如果由于某种原因队列服务不可用,或者性能恶化,随着新请求的到来,生产者的处理线程就会产生堆积。这可能会导致如下两个问题:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E系统可用性降低。由于每个线程都需要一定的内存开销,线程过多会使系统内存耗尽,甚至可能产生雪崩效应导致最终完全不可用。\u003C\u002Fli\u003E\u003Cli\u003E信息丢失。为了避免系统崩溃,工程师可能会给请求驱动型服务设置一个处理线程池,设置最大处理线程数量。这是一种典型的降级策略,目的是为了系统崩溃。但是,后续的请求会因为没有处理线程而被迫阻塞,最终可能产生信息丢失。例如:对于广告计费采集,如果采集系统因为线程耗尽而不接收客户端的计费行为,这些计费行为就会丢失。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Cp\u003E缓解这类问题的思路来自于CAP理论,即通过降低一致性来提高可用性。生产者接收线程在收到请求之后第一时间不去处理,直接将请求缓存在内存中(牺牲一致性),而在后台启动多个处理线程从缓存中读取请求、进行处理并写入分布式队列。与线程所占用的内存开销相比,大部分的请求所占内存几乎可以忽略。通过在接收请求和处理请求之间增加一层内存缓存,可以大大提高系统的处理吞吐量和可扩展性。这个方案本质上是一个内存生产者消费者模型。\u003C\u002Fp\u003E\u003Ch2\u003E批量写入优化\u003C\u002Fh2\u003E\u003Cp\u003E如果生产者的请求过大,写分布式队列可能成为性能瓶颈,有如下几个因素:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E队列自身性能不高。\u003C\u002Fli\u003E\u003Cli\u003E分布式队列编程模型往往被应用在跨机房的系统里面,跨机房的网络开销往往容易成为系统瓶颈。\u003C\u002Fli\u003E\u003Cli\u003E消息确认机制往往会大大降低队列的吞吐量以及响应时间。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Cp\u003E如果在处理请求和写队列之间添加一层缓存,消息写入程序批量将消息写入队列,可以大大提高系统的吞吐量。原因如下:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E批量写队列可以大大减少生产者和分布式队列的交互次数和消息传输量。特别是对于高吞吐小载荷的消息实体,批量写可以显著降低网络传输量。\u003C\u002Fli\u003E\u003Cli\u003E对于需要确认机制的消息,确认机制往往会大大降低队列的吞吐量以及响应时间,某些高敏感的消息需要多个消息中间件代理同时确认,这近一步恶化性能。在生产者的应用层将多条消息批量组合成一个消息体,消息中间件就只需要对批量消息进行一次确认,这可能会数量级的提高消息传输性能。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch2\u003E持久化优化\u003C\u002Fh2\u003E\u003Cp\u003E通过添加缓存,消费者服务的吞吐量和可用性都得到了提升。但缓存引入了一个新问题——内存数据丢失。对于敏感数据,工程师需要考虑如下两个潜在问题:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E如果内存中存在未处理完的请求,而某些原因导致生产者服务宕机,内存数据就会丢失而可能无法恢复。\u003C\u002Fli\u003E\u003Cli\u003E如果分布式队列长时间不可用,随着请求数量的不断增加,最终系统内存可能会耗尽而崩溃,内存的消息也可能丢失。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Cp\u003E所以缓存中的数据需要定期被持久化到磁盘等持久层设备中,典型的持久化触发策略主要有两种:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E定期触发,即每隔一段时间进行一次持久化。\u003C\u002Fli\u003E\u003Cli\u003E定量触发,即每当缓存中的请求数量达到一定阈值后进行持久化。\u003Cbr\u003E是否需要持久化优化,以及持久化策略应该由请求数据的敏感度、请求量、持久化性能等因素共同决定。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch1\u003E中间件选型\u003C\u002Fh1\u003E\u003Cp\u003E分布式队列不等同于各种开源的或者收费的消息中间件,甚至在一些场景下完全不需要使用消息中间件。但是,消息中间件产生的目的就是解决消息传递问题,这为分布式队列编程架构提供了很多的便利。在实际工作中,工程师们应该将成熟的消息中间件作为队列的首要备选方案。\u003Cbr\u003E本小节对消息中间件的功能、模型进行阐述,并给出一些消息中间件选型、部署的具体建议。\u003C\u002Fp\u003E\u003Ch2\u003E中间件的功能\u003C\u002Fh2\u003E\u003Cp\u003E明白一个系统的每个具体功能是设计和架构一个系统的基础。典型的消息中间件主要包含如下几个功能:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E消息接收\u003C\u002Fli\u003E\u003Cli\u003E消息分发\u003C\u002Fli\u003E\u003Cli\u003E消息存储\u003C\u002Fli\u003E\u003Cli\u003E消息读取\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch2\u003E概念模型\u003C\u002Fh2\u003E\u003Cp\u003E抽象的消息中间件模型包含如下几个角色:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E发送者和接收者客户端(Sender\u002FReceiver Client),在具体实施过程中,它们一般以库的形式嵌入到应用程序代码中。\u003C\u002Fli\u003E\u003Cli\u003E代理服务器(Broker Server),它们是与客户端代码直接交互的服务端代码。\u003C\u002Fli\u003E\u003Cli\u003E消息交换机(Exchanger),接收到的消息一般需要通过消息交换机(Exchanger)分发到具体的消息队列中。\u003C\u002Fli\u003E\u003Cli\u003E消息队列,一般是一块内存数据结构或持久化数据。\u003Cbr\u003E概念模型如下图:\u003Cbr\u003E\u003Cfigure\u003E\u003Cnoscript\u003E\u003Cimg src=\&https:\u002F\u002Fpic3.zhimg.com\u002Fdbbcc30d6411c4aaa5f073f_b.jpg\& data-rawwidth=\&1157\& data-rawheight=\&249\& class=\&origin_image zh-lightbox-thumb\& width=\&1157\& data-original=\&https:\u002F\u002Fpic4.zhimg.com\u002Fdbbcc30d6411c4aaa5f073f_r.jpg\&\u003E\u003C\u002Fnoscript\u003E\u003Cimg src=\&data:image\u002Fsvg+utf8,&svg%20xmlns='http:\u002F\u002Fwww.w3.org\u002FFsvg'%20width='1157'%20height='249'&&\u002Fsvg&\& data-rawwidth=\&1157\& data-rawheight=\&249\& class=\&origin_image zh-lightbox-thumb lazy\& width=\&1157\& data-original=\&https:\u002F\u002Fpic4.zhimg.com\u002Fdbbcc30d6411c4aaa5f073f_r.jpg\& data-actualsrc=\&https:\u002F\u002Fpic3.zhimg.com\u002Fdbbcc30d6411c4aaa5f073f_b.jpg\&\u003E\u003C\u002Ffigure\u003E为了提高分发性能,很多消息中间件把消息代理服务器的拓扑图发送到发送者和接收者客户端(Sender\u002FReceiver Client),如此一来,发送源可以直接进行消息分发。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch2\u003E选型标准\u003C\u002Fh2\u003E\u003Cp\u003E要完整的描述消息中间件各个方面非常困难,大部分良好的消息中间件都有完善的文档,这些文档的长度远远超过本文的总长度。但如下几个标准是工程师们在进行消息中间件选型时经常需要考虑和权衡的。\u003C\u002Fp\u003E\u003Ch3\u003E性能\u003C\u002Fh3\u003E\u003Cp\u003E性能主要有两个方面需要考虑:吞吐量(Throughput)和响应时间(Latency)。\u003Cbr\u003E不同的消息队列中间件的吞吐量和响应时间相差甚远,在选型时可以去网上查看一些性能对比报告。\u003Cbr\u003E对于同一种中间件,不同的配置方式也会影响性能。主要有如下几方面的配置:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E是否需要确认机制,即写入队列后,或从队列读取后,是否需要进行确认。确认机制对响应时间的影响往往很大。\u003C\u002Fli\u003E\u003Cli\u003E能否批处理,即消息能否批量读取或者写入。批量操作可以大大减少应用程序与消息中间件的交互次数和消息传递量,大大提高吞吐量。\u003C\u002Fli\u003E\u003Cli\u003E能否进行分区(Partition)。将某一主题消息队列进行分区,同一主题消息可以有多台机器并行处理。这不仅仅能影响消息中间件的吞吐量,还决定着消息中间件是否具备良好的可伸缩性(Scalability)。\u003C\u002Fli\u003E\u003Cli\u003E是否需要进行持久化。将消息进行持久化往往会同时影响吞吐量和响应时间。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch3\u003E可靠性\u003C\u002Fh3\u003E\u003Cp\u003E可靠性主要包含:可用性、持久化、确认机制等。\u003Cbr\u003E高可用性的消息中间件应该具备如下特征:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E消息中间件代理服务器(Broker)具有主从备份。即当一台代理服务宕机之后,备用服务器能接管相关的服务。\u003C\u002Fli\u003E\u003Cli\u003E消息中间件中缓存的消息是否有备份、并持久化。\u003Cbr\u003E根据CAP理论,高可用、高一致性以及网络分裂不可兼得。根据作者的观察,大部分的消息中间件在面临网络分裂的情况下下,都很难保证数据的一致性以及可用性。 很多消息中间件都会提供一些可配置策略,让使用者在可用性和一致性之间做权衡。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Cp\u003E高可靠的消息中间件应该确保从发送者接收到的消息不会丢失。中间件代理服务器的宕机并不是小概率事件,所以保存在内存中的消息很容易发生丢失。大部分的消息中间件都依赖于消息的持久化去降低消息丢失损失,即将接收到的消息写入磁盘。即使提供持久化,仍有两个问题需要考虑:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E磁盘损坏问题。长时间来看,磁盘出问题的概率仍然存在。\u003C\u002Fli\u003E\u003Cli\u003E性能问题。与操作内存相比,磁盘I\u002FO的操作性能要慢几个数量级。频繁持久化不仅会增加响应时间,也会降低吞吐量。\u003Cbr\u003E解决这两个问题的一个解决方案就是:多机确认,定期持久化。即消息被缓存在多台机器的内存中,只有每台机器都确认收到消息,才跟发送者确认(很多消息中间件都会提供相应的配置选项,让用户设置最少需要多少台机器接收到消息)。由于多台独立机器同时出故障的概率遵循乘法法则,指数级降低,这会大大提高消息中间件的可靠性。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Cp\u003E确认机制本质上是通讯的握手机制(Handshaking)。如果没有该机制,消息在传输过程中丢失将不会被发现。高敏感的消息要求选取具备确认机制的消息中间件。当然如果没有接收到消息中间件确认完成的指令,应用程序需要决定如何处理。典型的做法有两个:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E多次重试。\u003C\u002Fli\u003E\u003Cli\u003E暂存到本地磁盘或其它持久化媒介。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch3\u003E客户端接口所支持语言\u003C\u002Fh3\u003E\u003Cp\u003E采用现存消息中间件就意味着避免重复造轮子。如果某个消息中间件未能提供对应语言的客户端接口,则意味着极大的成本和兼容性问题。\u003C\u002Fp\u003E\u003Ch3\u003E投递策略(Delivery policies)\u003C\u002Fh3\u003E\u003Cp\u003E投递策略指的是一个消息会被发送几次。主要包含三种策略:最多一次(At most Once )、最少一次(At least Once)、仅有一次(Exactly Once)。\u003Cbr\u003E在实际应用中,只考虑消息中间件的投递策略并不能保证业务的投递策略,因为接收者在确认收到消息和处理完消息并持久化之间存在一个时间窗口。例如,即使消息中间件保证仅有一次(Exactly Once),如果接收者先确认消息,在持久化之前宕机,则该消息并未被处理。从应用的角度,这就是最多一次(At most Once)。反之,接收者先处理消息并完成持久化,但在确认之前宕机,消息就要被再次发送,这就是最少一次(At least Once)。 如果消息投递策略非常重要,应用程序自身也需要仔细设计。\u003C\u002Fp\u003E\u003Ch1\u003E消费者优化\u003C\u002Fh1\u003E\u003Cp\u003E消费者是分布式队列编程中真正的数据处理方,数据处理方最常见的挑战包括:有序性、串行化(Serializability)、频次控制、完整性和一致性等。\u003C\u002Fp\u003E\u003Ch2\u003E挑战\u003C\u002Fh2\u003E\u003Ch3\u003E有序性\u003C\u002Fh3\u003E\u003Cp\u003E在很多场景下,如何保证队列信息的有序处理是一个棘手的问题。如下图,假定分布式队列保证请求严格有序,请求ri2和ri1都是针对同一数据记录的不同状态,ri2的状态比ri1的状态新。T1、T2、T3和T4代表各个操作发生的时间,并且 T1 & T2 & T3 & T4(\&&\&代表早于)。\u003Cbr\u003E采用多消费者架构,这两条记录被两个消费者(Consumer1和Consumer2)处理后更新到数据库里面。Consumer1虽然先读取ri1但是却后写入数据库,这就导致,新的状态被老的状态覆盖,所以多消费者不保证数据的有序性。\u003Cbr\u003E\u003C\u002Fp\u003E\u003Cfigure\u003E\u003Cnoscript\u003E\u003Cimg src=\&https:\u002F\u002Fpic1.zhimg.com\u002Fd9b73cae177cbf02e020d_b.jpg\& data-rawwidth=\&472\& data-rawheight=\&231\& class=\&origin_image zh-lightbox-thumb\& width=\&472\& data-original=\&https:\u002F\u002Fpic2.zhimg.com\u002Fd9b73cae177cbf02e020d_r.jpg\&\u003E\u003C\u002Fnoscript\u003E\u003Cimg src=\&data:image\u002Fsvg+utf8,&svg%20xmlns='http:\u002F\u002Fwww.w3.org\u002FFsvg'%20width='472'%20height='231'&&\u002Fsvg&\& data-rawwidth=\&472\& data-rawheight=\&231\& class=\&origin_image zh-lightbox-thumb lazy\& width=\&472\& data-original=\&https:\u002F\u002Fpic2.zhimg.com\u002Fd9b73cae177cbf02e020d_r.jpg\& data-actualsrc=\&https:\u002F\u002Fpic1.zhimg.com\u002Fd9b73cae177cbf02e020d_b.jpg\&\u003E\u003C\u002Ffigure\u003E\u003Ch3\u003E串行化\u003C\u002Fh3\u003E\u003Cp\u003E很多场景下,串行化是数据处理的一个基本需求,这是保证数据完整性、可恢复性、事务原子性等的基础。为了在并行计算系统里实现串行化,一系列的相关理论和实践算法被提出。对于分布式队列编程架构,要在在多台消费者实现串行化非常复杂,无异于重复造轮子。\u003C\u002Fp\u003E\u003Ch3\u003E频次控制\u003C\u002Fh3\u003E\u003Cp\u003E有时候,消费者的消费频次需要被控制,可能的原因包括:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E费用问题。如果每次消费所引起的操作都需要收费,而同一个请求消息在队列中保存多份,不进行频次控制,就会导致无谓的浪费。\u003C\u002Fli\u003E\u003Cli\u003E性能问题。每次消费可能会引起对其他服务的调用,被调用服务希望对调用量有所控制,对同一个请求消息的多次访问就需要有所控制。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch3\u003E完整性和一致性\u003C\u002Fh3\u003E\u003Cp\u003E完整性和一致性是所有多线程和多进程的代码都面临的问题。在多线程或者多进程的系统中考虑完整性和一致性往往会大大地增加代码的复杂度和系统出错的概率。\u003C\u002Fp\u003E\u003Ch2\u003E单例服务优化\u003C\u002Fh2\u003E\u003Cp\u003E几乎所有串行化理论真正解决的问题只有一个:性能。 所以,在性能允许的前提下,对于消费者角色,建议采用单实例部署。通过单实例部署,有序性、串行化、完整性和一致性问题自动获得了解决。另外,单实例部署的消费者拥有全部所需信息,它可以在频次控制上采取很多优化策略。\u003C\u002Fp\u003E\u003Cp\u003E天下没有免费的午餐。同样,单实例部署并非没有代价,它意味着系统可用性的降低,很多时候,这是无法接受的。解决可用性问题的最直接的思路就是冗余(Redundancy)。最常用的冗余方案是Master-slave架构,不过大部分的Master-slave架构都是Active\u002Factive模式,即主从服务器都提供服务。例如,数据库的Master-slave架构就是主从服务器都提供读服务,只有主服务器提供写服务。大部分基于负载均衡设计的Master-slave集群中,主服务器和从服务器同时提供相同的服务。这显然不满足单例服务优化需求。有序性和串行化需要Active\u002Fpassive架构,即在某一时刻只有主实例提供服务,其他的从服务等待主实例失效。这是典型的领导人选举架构,即只有获得领导权的实例才能充当实际消费者,其他实例都在等待下一次选举。采用领导人选举的Active\u002Fpassive架构可以大大缓解纯粹的单实例部署所带来的可用性问题。\u003C\u002Fp\u003E\u003Cp\u003E令人遗憾的是,除非工程师们自己在消费者实例里面实现Paxos等算法,并在每次消息处理之前都执行领导人选举。否则,理论上讲,没有方法可以保障在同一个时刻只有一个领导者。而对每个消息都执行一次领导人选举,显然性能不可行。实际工作中,最容易出现的问题时机发生在领导人交接过程中,即前任领导人实例变成辅助实例,新部署实例开始承担领导人角色。为了平稳过渡,这两者之间需要有一定的通讯机制,但是,无论是网络分区(Network partition)还是原领导人服务崩溃都会使这种通讯机制变的不可能。\u003C\u002Fp\u003E\u003Cp\u003E对于完整性和一致性要求很高的系统,我们需要在选举制度和交接制度这两块进行优化。\u003C\u002Fp\u003E\u003Ch3\u003E领导人选举架构\u003C\u002Fh3\u003E\u003Cp\u003E典型的领导人选举算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol)。为了避免重复造轮子,建议采用ZooKeeper的分布式锁来实现领导人选举。典型的ZooKeeper实现算法如下(摘自参考资料[4]):\u003C\u002Fp\u003E\u003Cp\u003E\u003Cem\u003ELet ELECTION be a path of choice of the application. To volunteer to be a leader:\u003C\u002Fem\u003E\u003C\u002Fp\u003E\u003Cp\u003E\u003Cem\u003E1.Create znode z with path \&ELECTION\u002Fguid-n_\& with both SEQUENCE and EPHEMERAL\u003C\u002Fem\u003E\u003Cbr\u003E\u003Cem\u003E2.Let C be the children of \&ELECTION\&, and i be the \u003C\u002Fem\u003E\u003Cbr\u003E\u003Cem\u003E3.Watch for changes on \&ELECTION\u002Fguid-n_j\&, where j is the largest sequence number such that j & i and n_j is a znode in C;\u003C\u002Fem\u003E\u003C\u002Fp\u003E\u003Cp\u003E\u003Cem\u003EUpon receiving a notification of znode deletion:\u003C\u002Fem\u003E\u003C\u002Fp\u003E\u003Cp\u003E\u003Cem\u003E1.Let C be the new set of children of ELECTION;\u003C\u002Fem\u003E\u003Cbr\u003E\u003Cem\u003E2.If z is the smallest node in C, then exec\u003C\u002Fem\u003E\u003Cbr\u003E\u003Cem\u003E3.Otherwise, watch for changes on \&ELECTION\u002Fguid-n_j\&, where j is the largest sequence number such that j & i and n_j is a znode in C;\u003C\u002Fem\u003E\u003C\u002Fp\u003E\u003Ch3\u003E领导人交接架构\u003C\u002Fh3\u003E\u003Cp\u003E领导人选举的整个过程发生在ZooKeeper集群中,各个消费者实例在这场选举中只充当被告知者角色(Learner)。领导人选举算法,只能保证最终只有一个Leader被选举出来,并不保障被告知者对Leader的理解是完全一致的。本质上,上文的架构里,选举的结果是作为令牌(Token)传递给消费者实例,消费者将自身的ID与令牌进行对比,如果相等,则开始执行消费操作。所以当发生领导人换届的情况,不同的Learner获知新Leader的时间并不同。例如,前任Leader如果因为网络问题与ZooKeeper集群断开,前任Leader只能在超时后才能判断自己是否不再承担Leader角色了,而新的Leader可能在这之前已经产生。另一方面,即使前任Leader和新Leader同时接收到新Leader选举结果,某些业务的完整性要求迫使前任Leader仍然完成当前未完成的工作。以上的讲解非常抽象,生活中却给了一些更加具体的例子。众所周知,美国总统候选人在选举结束后并不直接担任美国总统,从选举到最终承担总统角色需要一个过渡期。对于新当选Leader的候选人而言,过渡期间称之为加冕阶段(Inauguration)。对于即将卸任的Leader,过渡期称为交接阶段(HandOver)。所以一个基于领导人选举的消费者从加冕到卸任经历三个阶段:Inauguration、Execution、HandOver。在加冕阶段,新领导需要进行一些初始化操作。Execution阶段是真正的队列消息处理阶段。在交接阶段,前任领导需要进行一些清理操作。\u003C\u002Fp\u003E\u003Cp\u003E类似的,为了解决领导人交接问题,所有的消费者从代码实现的角度都需要实现类似ILeaderCareer接口。这个接口包含三个方发inaugurate(),handOver()和execute()。某个部署实例(Learner)在得知自己承担领导人角色后,需要调用inaugurate()方法,进行加冕。主要的消费逻辑通过不停的执行execute()实现,当确认自己不再承担领导人之后,执行handOver()进行交接。\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Epublic interface ILeaderCareer {\n
public void inaugurate();\n
public void handOver();\n
public boolean execute();\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003E\u003Cp\u003E如果承担领导人角色的消费者,在执行execute()阶段得知自己将要下台,根据消息处理的原子性,该领导人可以决定是否提前终止操作。如果整个消息处理是一个原子性事务,直接终止该操作可以快速实现领导人换届。否则,前任领导必须完成当前消息处理后,才进入交接阶段。这意味着新的领导人,在inaugurate()阶段需要进行一定时间的等待。\u003C\u002Fp\u003E\u003Ch2\u003E排重优化\u003C\u002Fh2\u003E\u003Cp\u003E频次控制是一个经典问题。对于分布式队列编程架构,相同请求重复出现在队列的情况并不少见。如果相同请求在队列中重复太多,排重优化就显得很必要。分布式缓存更新是一个典型例子,所有请求都被发送到队列中用于缓存更新。如果请求符合典型的高斯分布,在一段时间内会出现大量重复的请求,而同时多线程更新同一请求缓存显然没有太大的意义。\u003Cbr\u003E排重优化是一个算法,其本质是基于状态机的编程,整个讲解通过模型、构思和实施三个步骤完成。\u003C\u002Fp\u003E\u003Ch3\u003E模型\u003C\u002Fh3\u003E\u003Cp\u003E进行排重优化的前提是大量重复的请求。在模型这一小节,我们首先阐述重复度模型、以及不同重复度所导致的消费模型,最后基于这两个模型去讲解排重状态机。\u003C\u002Fp\u003E\u003Ch4\u003E重复度模型\u003C\u002Fh4\u003E\u003Cp\u003E首先我们给出最小重复长度的概念。同一请求最小重复长度:同一请求在队列中的重复出现的最小间距。例如,请求ri第一次出现在位置3,第二次出现在10,最小重复长度等于7。\u003Cbr\u003E是否需要进行排重优化取决于队列中请求的重复度。由于不同请求之间并不存在重复的问题,不失一般性,这里的模型只考了单个请求的重复度,重复度分为三个类:无重复、稀疏重复、高重复。\u003Cbr\u003E无重复:在整个请求过程,没有任何一个请求出现一次以上。\u003Cbr\u003E稀疏重复:主要的请求最小重复长度大于消费队列长度。\u003Cbr\u003E高重复:大量请求最小重复长度小于消费队列长度。\u003Cbr\u003E对于不同的重复度,会有不同的消费模型。\u003C\u002Fp\u003E\u003Ch4\u003E无重复消费模型\u003C\u002Fh4\u003E\u003Cp\u003E在整个队列处理过程中,所有的请求都不相同,如下图:\u003Cbr\u003E\u003C\u002Fp\u003E\u003Cfigure\u003E\u003Cnoscript\u003E\u003Cimg src=\&https:\u002F\u002Fpic2.zhimg.com\u002Feea06af9e0edd1557382e_b.jpg\& data-rawwidth=\&485\& data-rawheight=\&192\& class=\&origin_image zh-lightbox-thumb\& width=\&485\& data-original=\&https:\u002F\u002Fpic3.zhimg.com\u002Feea06af9e0edd1557382e_r.jpg\&\u003E\u003C\u002Fnoscript\u003E\u003Cimg src=\&data:image\u002Fsvg+utf8,&svg%20xmlns='http:\u002F\u002Fwww.w3.org\u002FFsvg'%20width='485'%20height='192'&&\u002Fsvg&\& data-rawwidth=\&485\& data-rawheight=\&192\& class=\&origin_image zh-lightbox-thumb lazy\& width=\&485\& data-original=\&https:\u002F\u002Fpic3.zhimg.com\u002Feea06af9e0edd1557382e_r.jpg\& data-actualsrc=\&https:\u002F\u002Fpic2.zhimg.com\u002Feea06af9e0edd1557382e_b.jpg\&\u003E\u003C\u002Ffigure\u003E\u003Ch4\u003E稀疏重复消费模型\u003C\u002Fh4\u003E\u003Cp\u003E当同一请求最小重复长度大于消费者队列长度,如下图。假定有3个消费者,Consumer1将会处理r1,Consumer2将会处理r2,Consumer3将会处理r3,如果每个请求处理的时间严格相等,Consumer1在处理完r1之后,接着处理r4,Consumer2将会处理r2之后会处理r1。虽然r1被再次处理,但是任何时刻,只有这一个消费者在处理r1,不会出现多个消费者同时处理同一请求的场景。\u003Cbr\u003E\u003C\u002Fp\u003E\u003Cfigure\u003E\u003Cnoscript\u003E\u003Cimg src=\&https:\u002F\u002Fpic3.zhimg.com\u002F60a4438bed1ea9ea7d5a1d5_b.jpg\& data-rawwidth=\&529\& data-rawheight=\&192\& class=\&origin_image zh-lightbox-thumb\& width=\&529\& data-original=\&https:\u002F\u002Fpic2.zhimg.com\u002F60a4438bed1ea9ea7d5a1d5_r.jpg\&\u003E\u003C\u002Fnoscript\u003E\u003Cimg src=\&data:image\u002Fsvg+utf8,&svg%20xmlns='http:\u002F\u002Fwww.w3.org\u002FFsvg'%20width='529'%20height='192'&&\u002Fsvg&\& data-rawwidth=\&529\& data-rawheight=\&192\& class=\&origin_image zh-lightbox-thumb lazy\& width=\&529\& data-original=\&https:\u002F\u002Fpic2.zhimg.com\u002F60a4438bed1ea9ea7d5a1d5_r.jpg\& data-actualsrc=\&https:\u002F\u002Fpic3.zhimg.com\u002F60a4438bed1ea9ea7d5a1d5_b.jpg\&\u003E\u003C\u002Ffigure\u003E\u003Ch4\u003E高重复消费模型\u003C\u002Fh4\u003E\u003Cp\u003E如下图,仍然假定有3个消费者,队列中前面4个请求都是r1,它会同时被3个消费者线程处理:\u003C\u002Fp\u003E\u003Cp\u003E\u003Cfigure\u003E\u003Cnoscript\u003E\u003Cimg src=\&https:\u002F\u002Fpic2.zhimg.com\u002F90ca600f2f849b93a23778_b.jpg\& data-rawwidth=\&485\& data-rawheight=\&192\& class=\&origin_image zh-lightbox-thumb\& width=\&485\& data-original=\&https:\u002F\u002Fpic1.zhimg.com\u002F90ca600f2f849b93a23778_r.jpg\&\u003E\u003C\u002Fnoscript\u003E\u003Cimg src=\&data:image\u002Fsvg+utf8,&svg%20xmlns='http:\u002F\u002Fwww.w3.org\u002FFsvg'%20width='485'%20height='192'&&\u002Fsvg&\& data-rawwidth=\&485\& data-rawheight=\&192\& class=\&origin_image zh-lightbox-thumb lazy\& width=\&485\& data-original=\&https:\u002F\u002Fpic1.zhimg.com\u002F90ca600f2f849b93a23778_r.jpg\& data-actualsrc=\&https:\u002F\u002Fpic2.zhimg.com\u002F90ca600f2f849b93a23778_b.jpg\&\u003E\u003C\u002Ffigure\u003E显然,对于无重复和稀疏重复的分布式队列,排重优化并不会带来额外的好处。排重优化所针对的对象是高重复消费模型,特别是对于并行处理消费者比较多的情况,重复处理同一请求,资源消耗极大。\u003C\u002Fp\u003E\u003Ch4\u003E排重状态机\u003C\u002Fh4\u003E\u003Cp\u003E排重优化的主要对象是高重复的队列,多个消费者线程或进程同时处理同一个幂等请求只会浪费计算资源并延迟其他待请求处理。所以,排重状态机的一个目标是处理唯一性,即:同一时刻,同一个请求只有一个消费者处理。如果消费者获取一条请求消息,但发现其他消费者正在处理该消息,则当前消费者应该处于等待状态。如果对同一请求,有一个消费者在处理,一个消费者在等待,而同一请求再次被消费者读取,再次等待则没有意义。所以,状态机的第二个目标是等待唯一性,即:同一时刻,同一个请求最多只有一个消费者处于等待状态。总上述,状态机的目标是:处理唯一性和等待唯一性。我们把正在处理的请求称为头部请求,正在等待的请求称为尾部请求。\u003Cbr\u003E由于状态机的处理单元是请求,所以需要针对每一个请求建立一个排重状态机。基于以上要求,我们设计的排重状态机包含4个状态Init,Process,Block,Decline。各个状态之间转化过程如下图:\u003Cbr\u003E\u003C\u002Fp\u003E\u003Cfigure\u003E\u003Cnoscript\u003E\u003Cimg src=\&https:\u002F\u002Fpic1.zhimg.com\u002Fcaca39e212c8af5adf00b1_b.jpg\& data-rawwidth=\&302\& data-rawheight=\&351\& class=\&content_image\& width=\&302\&\u003E\u003C\u002Fnoscript\u003E\u003Cimg src=\&data:image\u002Fsvg+utf8,&svg%20xmlns='http:\u002F\u002Fwww.w3.org\u002FFsvg'%20width='302'%20height='351'&&\u002Fsvg&\& data-rawwidth=\&302\& data-rawheight=\&351\& class=\&content_image lazy\& width=\&302\& data-actualsrc=\&https:\u002F\u002Fpic1.zhimg.com\u002Fcaca39e212c8af5adf00b1_b.jpg\&\u003E\u003C\u002Ffigure\u003E\u003Col\u003E\u003Cli\u003E状态机创建时处于Init状态。\u003C\u002Fli\u003E\u003Cli\u003E对Init状态进行Enqueue操作,即接收一个请求,开始处理(称为头部请求),状态机进入Process状态。\u003C\u002Fli\u003E\u003Cli\u003E状态机处于Process状态,表明当前有消费者正在处理头部请求。此时,如果进行Dequeue操作,即头部请求处理完成,返回Init状态。如果进行Enqueue操作,即另一个消费者准备处理同一个请求,状态机进入Block状态(该请求称为尾部请求)。\u003C\u002Fli\u003E\u003Cli\u003E状态机处于Block状态,表明头部请求正在处理,尾部请求处于阻塞状态。此时,进行Dequeue操作,即头部请求处理完成,返回Process状态,并且尾部请求变成头部请求,原尾部请求消费者结束阻塞状态,开始处理。进行Enqueue操作,表明一个新的消费者准备处理同一个请求,状态机进入Decline状态。\u003C\u002Fli\u003E\u003Cli\u003E状态机进入Decline状态,根据等待唯一性目标,处理最新请求的消费者将被抛弃该消息,状态机自动转换回Block状态。\u003C\u002Fli\u003E\u003C\u002Fol\u003E\u003Ch3\u003E构思\u003C\u002Fh3\u003E\u003Cp\u003E状态机描述的是针对单个请求操作所引起状态变化,排重优化需要解决队列中所有请求的排重问题,需要对所有请求的状态机进行管理。这里只考虑单虚拟机内部对所有请求状态机的管理,对于跨虚拟机的管理可以采用类似的方法。对于多状态机管理主要包含三个方面:一致性问题、完整性问题和请求缓存驱逐问题。\u003C\u002Fp\u003E\u003Ch4\u003E一致性问题\u003C\u002Fh4\u003E\u003Cp\u003E一致性在这里要求同一请求的不同消费者只会操作一个状态机。由于每个请求都产生一个状态机,系统将会包含大量的状态机。为了兼顾性能和一致性,我们采用ConcurrentHashMap保存所有的状态机。用ConcurrentHashMap而不是对整个状态机队列进行加锁,可以提高并行处理能力,使得系统可以同时操作不同状态机。为了避免处理同一请求的多消费者线程同时对ConcurrentHashMap进行插入所导致状态机不一致问题,我们利用了ConcurrentHashMap的putIfAbsent()方法。代码方案如下,key2Status用于存储所有的状态机。消费者在处理请求之前,从状态机队列中读取排重状态机TrafficAutomate。如果没有找到,则创建一个新的状态机,并通过putIfAbsent()方法插入到状态机队列中。\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Eprivate ConcurrentHashMap&T, TrafficAutomate& key2Status = new ConcurrentHashMap();\nTrafficAutomate trafficAutomate = key2Status.get(key);\nif(trafficAutomate == null)\n{\n
trafficAutomate = new TrafficAutomate();\n
TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);\n
if(oldAutomate != null)\n
trafficAutomate = oldA\n
}\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003E\u003Ch4\u003E完整性问题\u003C\u002Fh4\u003E\u003Cp\u003E完整性要求保障状态机Init,Process,Block,Decline四种状态正确、状态之间的转换也正确。由于状态机的操作非常轻量级,兼顾完整性和降低代码复杂度,我们对状态机的所有方法进行加锁。\u003C\u002Fp\u003E\u003Ch4\u003E请求缓存驱逐问题(Cache Eviction)\u003C\u002Fh4\u003E\u003Cp\u003E如果不同请求的数量太多,内存永久保存所有请求的状态机的内存开销太大。所以,某些状态机需要在恰当的时候被驱逐出内存。这里有两个思路:\u003C\u002Fp\u003E\u003Cul\u003E\u003Cli\u003E当状态机返回Init状态时,清除出队列。\u003C\u002Fli\u003E\u003Cli\u003E启动一个后台线程,定时扫描状态机队列,采用LRU等标准缓存清除机制。\u003C\u002Fli\u003E\u003C\u002Ful\u003E\u003Ch4\u003E标识问题\u003C\u002Fh4\u003E\u003Cp\u003E每个请求对应于一个状态机,不同的状态机采用不同的请求进行识别。\u003Cbr\u003E对于同一状态机的不同消费者,在单虚拟机方案中,我们采用线程id进行标识。\u003C\u002Fp\u003E\u003Ch3\u003E实施\u003C\u002Fh3\u003E\u003Cp\u003E排重优化的主要功能都是通过排重状态机(TrafficAutomate)和状态机队列(QueueCoordinator)来实施的。排重状态机描述的是针对单个请求的排重问题,状态机队列解决所有请求状态机的排重问题。\u003C\u002Fp\u003E\u003Ch4\u003E状态机实施(TrafficAutomate)\u003C\u002Fh4\u003E\u003Cp\u003E根据状态机模型,其主要操作为enQueue和deQueue,其状态由头部请求和尾部请求的状态共同决定,所以需要定义两个变量为head和tail,用于表示头部请求和尾部请求。为了确保多线程操作下状态机的完整性(Integraty),所有的操作都将加上锁。\u003C\u002Fp\u003EenQueue操作\u003Cp\u003E当一个消费者执行enQueue操作时:如果此时尾部请求不为空,根据等待唯一性要求,返回DECLINE,当前消费者应该抛弃该请求;如果头部请求为空,返回ACCPET,当前消费者应该立刻处理该消息;否则,返回BLOCK,该消费者应该等待,并不停的查看状态机的状态,一直到头部请求处理完成。enQueue代码如下:\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Esynchronized ActionEnum enQueue(long id)\n{ \n
if(tail != INIT_QUEUE_ID)\n
return DECLINE;\n
if(head == INIT_QUEUE_ID)\n
return ACCEPT;\n
return BLOCK;\n
}\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003EdeQueue操作\u003Cp\u003E对于deQueue操作,首先将尾部请求赋值给头部请求,并将尾部请求置为无效。deQueue代码如下:\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Esynchronized boolean deQueue(long id)\n{\n
tail = INIT_QUEUE_ID;\\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003E\u003Ch4\u003E状态机队列实施(QueueCoordinator)\u003C\u002Fh4\u003E接口定义\u003Cp\u003E状态机队列集中管理所有请求的排重状态机,所以其操作和单个状态机一样,即enQueue和deQueuqe接口。这两个接口的实现需要识别特定请求的状态机,所以它们的入参应该是请求。为了兼容不同类型的请求消息,我们采用了Java泛型编程。接口定义如下:\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Epublic interface QueueCoordinator&T& {\n\n
public boolean enQueue(T key);\n\n
public void deQueue(T key);\n\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003EenQueue操作\u003Cp\u003EenQueue操作过程如下:\u003Cbr\u003E首先,根据传入的请求key值,获取状态机, 如果不存在则创建一个新的状态机,并保存在ConcurrentHashMap中。\u003Cbr\u003E接下来,获取线程id作为该消费者的唯一标识,并对对应状态机进行enQueue操作。\u003Cbr\u003E如果状态机返回值为ACCEPT或者DECLINE,返回业务层处理代码,ACCEPT意味着业务层需要处理该消息,DECLINE表示业务层可以抛弃当前消息。如果状态机返回值为Block,则该线程保持等待状态。\u003Cbr\u003E异常处理。在某些情况下,头部请求线程可能由于异常,未能对状态机进行deQueue操作(作为组件提供方,不能假定所有的规范被使用方实施)。为了避免处于阻塞状态的消费者无期限地等待,建议对状态机设置安全超时时限。超过了一定时间后,状态机强制清空头部请求,返回到业务层,业务层开始处理该请求。\u003Cbr\u003E代码如下:\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Epublic boolean enQueue(T key) {\n
_loggingStastic();\n\n
TrafficAutomate trafficAutomate = key2Status.get(key);\n
if(trafficAutomate == null)\n
trafficAutomate = new TrafficAutomate();\n
TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);\n
if(oldAutomate != null)\n
trafficAutomate = oldA\n
long threadId = Thread.currentThread().getId();\n\n
ActionEnum action = trafficAutomate.enQueue(threadId);\n\n
if(action == DECLINE)\n
else if (action == ACCEPT)\n
\u002F\u002FBlocking status means some other thread are working on this key, so just wait till timeout\n
long start = System.currentTimeMillis();\n
long span = 0;\n
_nonExceptionSleep(NAP_TIME_IN_MILL);\n\n
if(trafficAutomate.isHead(threadId))\n
span = System.currentTimeMillis() -\n
}while(span &= timeout);\n\n
\u002F\u002Fremove head so that it won't block the queue for too long\n
trafficAutomate.evictHeadByForce(threadId);\n\\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003EdeQueue操作\u003Cp\u003EdeQueue操作首先从ConcurrentHashMap获取改请求所对应的状态机,接着获取该线程的线程id,对状态机进行deQueue操作。\u003Cbr\u003EenQueue代码如下:\u003C\u002Fp\u003E\u003Cdiv class=\&highlight\&\u003E\u003Cpre\u003E\u003Ccode class=\&language-text\&\u003E\u003Cspan\u003E\u003C\u002Fspan\u003Epublic void deQueue(T key) {\n
TrafficAutomate trafficAutomate = key2Status.get(key);\n\n
if(trafficAutomate == null)\n
logger.error(\&key {} doesn't exist \&, key);\\n
long threadId = Thread.currentThread().getId();\n\n
trafficAutomate.deQueue(threadId);\n}\n\u003C\u002Fcode\u003E\u003C\u002Fpre\u003E\u003C\u002Fdiv\u003E\u003Ch4\u003E源代码\u003C\u002Fh4\u003E\u003Cp\u003E完整源代码可以在\u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=https%3A\u002F\u002Fgithub.com\u002FdinglauFQueueCoordinator\u002Ftree\u002Fmaster\u002Fsrc\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003EQueueCoordinator\u003C\u002Fa\u003E获取。\u003C\u002Fp\u003E\u003Ch1\u003E参考资料\u003C\u002Fh1\u003E\u003Cp\u003E[1] Rabbit MQ, \u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=https%3A\u002F\u002Fwww.rabbitmq.com\u002Fha.html\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003EHighly Available Queues\u003C\u002Fa\u003E.\u003Cbr\u003E[2] IBM Knowledge Center, \u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=https%3A\u002F\u002Fwww.ibm.com\u002Fsupport\u002Fknowledgecenter\u002FSSFKSJ_8.0.0\u002Fcom.ibm.mq.pro.doc\u002Fq002620_.htm\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003EIntroduction to message queuing\u003C\u002Fa\u003E.\u003Cbr\u003E[3] Wikipedia, \u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=https%3A\u002F\u002Fen.wikipedia.org\u002Fwiki\u002FSerializability\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003ESerializability\u003C\u002Fa\u003E.\u003Cbr\u003E[4] Hadoop, \u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=https%3A\u002F\u002Fzookeeper.apache.org\u002Fdoc\u002Ftrunk\u002Frecipes.html%23sc_leaderElection\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003EZooKeeper Recipes and Solutions\u003C\u002Fa\u003E.\u003Cbr\u003E[5] \u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=http%3A\u002F\u002Fkafka.apache.org\u002Fdocumentation.html%23introduction\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003EApache Kafka\u003C\u002Fa\u003E.\u003Cbr\u003E[6] Lamport L, \u003Ca href=\&http:\u002F\u002Flink.zhihu.com\u002F?target=http%3A\u002F\u002Fresearch.microsoft.com\u002Fen-us\u002Fum\u002Fpeople\u002Flamport\u002Fpubs\u002Fpaxos-simple.pdf\& class=\& wrap external\& target=\&_blank\& rel=\&nofollow noreferrer\&\u003EPaxos Made Simple\u003C\u002Fa\u003E.\u003C\u002Fp\u003E&,&updated&:new Date(&T09:52:25.000Z&),&canComment&:false,&commentPermission&:&anyone&,&commentCount&:5,&collapsedCount&:0,&likeCount&:134,&state&:&published&,&isLiked&:false,&slug&:&&,&isTitleImageFullScreen&:false,&rating&:&none&,&titleImage&:&&,&links&:{&comments&:&\u002Fapi\u002Fposts\u002F2Fcomments&},&reviewers&:[],&topics&:[{&url&:&https:\u002F\u002Fwww.zhihu.com\u002Ftopic\u002F&,&id&:&&,&name&:&分布式系统&},{&url&:&https:\u002F\u002Fwww.zhihu.com\u002Ftopic\u002F&,&id&:&&,&name&:&消息队列&}],&adminClosedComment&:false,&titleImageSize&:{&width&:0,&height&:0},&href&:&\u002Fapi\u002Fposts\u002F&,&excerptTitle&:&&,&tipjarState&:&closed&,&annotationAction&:[],&sourceUrl&:&&,&pageCommentsCount&:5,&hasPublishingDraft&:false,&snapshotUrl&:&&,&publishedTime&:&T17:52:25+08:00&,&url&:&\u002Fp\u002F&,&lastestLikers&:[{&bio&:&&,&isFollowing&:false,&hash&:&98ab01f232fbb&,&uid&:80,&isOrg&:false,&slug&:&alan-wang-zzz&,&isFollowed&:false,&description&:&&,&name&:&吃个小烧饼&,&profileUrl&:&https:\u002F\u002Fwww.zhihu.com\u002Fpeople\u002Falan-wang-zzz&,&avatar&:{&id&:&f966df4ee12eaf84199f&,&template&:&https:\u002F\u002Fpic3.zhimg.com\u002F{id}_{size}.jpg&},&isOrgWhiteList&:false,&isBanned&:false},{&bio&:&熟读林诗三百首,不会念诗也会苟。&,&isFollowing&:false,&hash&:&3fc8eecc1bb420&,&uid&:84,&isOrg&:false,&slug&:&mei-you-ren-87&,&isFollowed&:false,&description&:&&,&name&:&Asterisk&,&profileUrl&:&https:\u002F\u002Fwww.zhihu.com\u002Fpeople\u002Fmei-you-ren-87&,&avatar&:{&id&:&v2-004d540c992da057ef45d0&,&template&:&https:\u002F\u002Fpic2.zhimg.com\u002F{id}_{size}.jpg&},&isOrgWhiteList&:false,&isBanned&:false},{&bio&:&精通C\u002FC++\u002FJava\u002FPython\u002FGo\u002FJs等语言的hello world程序编写&,&isFollowing&:false,&hash&:&d3f1b1f24af3d563c036&,&uid&:72,&isOrg&:false,&slug&:&puyangsky&,&isFollowed&:false,&description&:&https:\u002F\u002Fgithub.com\u002Fpuyangsky\nhttp:\u002F\u002Fcnblogs.com\u002Fpuyangsky&,&name&:&puyangsky&,&profileUrl&:&https:\u002F\u002Fwww.zhihu.com\u002Fpeople\u002Fpuyangsky&,&avatar&:{&id&:&v2-f9b1e0dbdadc5a5eb779&,&template&:&https:\u002F\u002Fpic3.zhimg.com\u002F{id}_{size}.jpg&},&isOrgWhiteList&:false,&isBanned&:false},{&bio&:&不优雅的人&,&isFollowing&:false,&hash&:&9c30c6f4c855e609eaafc9c&,&uid&:76,&isOrg&:false,&slug&:&li-wen-chao-99&,&isFollowed&:false,&description&:&希望和天下人交朋友的人,具有Geek精神的app开发者,用互联网思维看世界,把生活和工作的每一件事做到极致.&,&name&:&李文超&,&profileUrl&:&https:\u002F\u002Fwww.zhihu.com\u002Fpeople\u002Fli-wen-chao-99&,&avatar&:{&id&:&v2-c10cac81ad3aa6c9df05&,&template&:&https:\u002F\u002Fpic1.zhimg.com\u002F{id}_{size}.jpg&},&isOrgWhiteList&:false,&isBanned&:false},{&bio&:&研发工程师&,&isFollowing&:false,&hash&:&fcba0d532b009dade0b20&,&uid&:52,&isOrg&:false,&slug&:&long-gang-62-42&,&isFollowed&:false,&descript

我要回帖

更多关于 java 分布式任务队列 的文章

 

随机推荐