Post on 23-Jun-2015
description
Kaninchen & SchlangenRabbitMQ & Python
Markus Zapke-Gründemann11. DZUG-Tagung zu Zope, Plone und Python
MarkusZapke-Gründemann• Softwareentwickler seit 2001
• Schwerpunkt: Web Application Development mit Python und PHP
• Django, symfony & Zend Framework
• Freier Softwareentwickler und Berater seit 2008
• www.keimlink.de
Übersicht
• Warum Queues?
• AMQP
• RabbitMQ
• Python Bibliotheken
• Carrot
• Celery
Warum Queues?
• Entkoppeln von Informationsproduzenten und -konsumenten
• Asynchrone Verarbeitung
• Load Balancing
• Skalierbarkeit
AMQP
• Advanced Message Queuing Protocol
• Offenes Protokoll
• Plattformunabhängig
• Port 5673/tcp
• http://www.amqp.org/
AMQP
• Offener Standard für Messaging Middleware
• Virtual Hosts
• Exchange (durable oder auto-deleted)
• Binding
• Queue (durable oder auto-deleted)
Producer - Consumer
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
Fanout Exchange
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
Direct Exchange
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
Topic Exchange
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
RabbitMQ
• AMQP Message Broker
• Erlang
• Open Source
• Mitglied in der AMQP Working Group
• XMPP, SMTP, STOMP und HTTP (mit Adaptern)
• http://www.rabbitmq.com/
Virtual Host Access Controlmit RabbitMQ
$ rabbitmqctl add_user username secret$ rabbitmqctl add_vhost message-vhost$ rabbitmqctl set_permissions -p message-vhost username ".*" ".*" ".*"
Konfiguration
LesenSchreiben
Python Bibliotheken
• amqplib - Python AMQP Client
• carrot - AMQP Messaging Framework
• pika - Python AMQP Client
• txAMQP - Python AMQP Library für Twisted
Carrot
• AMQP Messaging Framework
• High-Level Interface
• Benutzt amqplib
• Serialisierung (Pickle, JSON, YAML)
• Autor: Ask Solem
• http://github.com/ask/carrot/
exchange = 'messaging'queue = 'mbox'routing_key = 'message'
conf.py
connection.pyfrom carrot.connection import BrokerConnection
conn = BrokerConnection(hostname='localhost', userid='username', password='secret', virtual_host='message-vhost')
import datetime
from carrot.messaging import Publisherfrom carrot.utils import gen_unique_id
from conf import exchange, queue, routing_keyfrom connection import conn
publisher = Publisher(connection=conn, exchange=exchange, routing_key=routing_key, queue=queue, serializer='pickle')data = {'message_id': gen_unique_id(), 'timestamp': datetime.datetime.now(), 'message': 'Lorem ipsum dolor sit amet.'}publisher.send(data)
publisher.py
consumer.pyfrom carrot.messaging import Consumer
from conf import exchange, queue, routing_keyfrom connection import conn
class MessageConsumer(Consumer): def receive(self, message_data, message): data = (message.delivery_tag, message_data['message'], message_data['message_id'], message_data['timestamp'].isoformat()) print 'Message %d "%s" with id %s sent at %s.' % data message.ack()
if __name__ == '__main__': consumer = MessageConsumer(connection=conn, queue=queue, exchange=exchange, routing_key=routing_key) consumer.wait()
CeleryDistributed Task Queue• Backends: RabbitMQ, STOMP, Redis, Ghetto
Queue
• Clustering mit RabbitMQ
• Webhooks
• Django-Integration (optional)
• Autor: Ask Solem
• http://github.com/ask/celery/
CeleryDistributed Task Queue
• Serialisierung (Pickle, JSON, YAML)
• Parallele Ausführung
• Zeitgesteuerte Ausführung
• SQLAlchemy, carrot, anyjson
Python Task# Task als Klassefrom celery.task import Taskfrom myproject.models import User
class CreateUserTask(Task): def run(self, username, password): User.create(username, password)
>>> from tasks import CreateUserTask>>> CreateUserTask().delay('john', 'secret')
# Task als Funktion mit Decoratorfrom celery.decorators import taskfrom myproject.models import User
@task()def create_user(username, password): User.create(username, password)
>>> from tasks import create_user>>> create_user.delay('john', 'secret')
Python Task@task() # Benutzt pickle, um das Objekt zu serialisieren.def check_means(user): return user.has_means()
>>> from tasks import check_means>>> result = check_means.delay(user)>>> result.ready() # Gibt True zurück wenn der Task beendet ist.False>>> result.result # Task ist noch nicht beendet, kein Ergebnis verfügbar.None>>> result.get() # Warten bis der Task fertig ist und Ergebnis zurückgeben.93.27>>> result.result # Jetzt ist ein Ergebnis da.93.27>>> result.successful() # War der Task erfolgreich?True
Python TaskKonfiguration
$ celeryd --loglevel=INFO
# celeryconfig.pyBROKER_HOST = "localhost"BROKER_PORT = 5672BROKER_USER = "myuser"BROKER_PASSWORD = "mypassword"BROKER_VHOST = "myvhost"
CELERY_RESULT_BACKEND = "database"CELERY_RESULT_DBURI = "mysql://user:password@host/dbname"
CELERY_IMPORTS = ("tasks", )
Zeitgesteuerter Task# Periodic taskfrom celery.decorators import periodic_taskfrom datetime import timedelta
@periodic_task(run_every=timedelta(seconds=30))def every_30_seconds(): print("Running periodic task!")
# crontabfrom celery.task.schedules import crontabfrom celery.decorators import periodic_task
@periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1))def every_monday_morning(): print("Execute every Monday at 7:30AM.")
$ celerybeat$ celeryd -B
Fotos der Titelfolie
• Kaninchen: Gidzy http://www.flickr.com/photos/gidzy/3614942864/
• Grüne Python: Frank Wouters - http://www.flickr.com/photos/frank-wouters/540417590/
Lizenz
Dieses Werk ist unter einem Creative Commons Namensnennung-Weitergabe unter gleichen
Bedingungen 3.0 Unported Lizenzvertrag lizenziert. Um die Lizenz anzusehen, gehen Sie bitte zu
http://creativecommons.org/licenses/by-sa/3.0/ oder schicken Sie einen Brief an Creative Commons, 171 Second Street, Suite 300, San Francisco, California
94105, USA.