RabbitMQ 로 celery메시지 보내기
NodeJS에서 celery RabbitMQ 로 메시지 보내기
아래는 celery 가 사용하는 rabbitMQ 로 보낼때 사용하는 code 이다.celery 는 기본적으로 queue 이름을 celery 로 exchange 를 celery 로 만들고, routingKey 는 없다.(empty)
'use strict'; const amqplib = require('amqplib'); const Logger = use('Logger'); class MsgIssuer { constructor() { // for test this.rabbitMqUrl = 'amqp://userid:userpw@192.168.10.136//'; } issueTask() { this._issuing('tasks.issueTask', {}, 'cbill0'); } _issuing(task, kwargs, idHead) { const open = amqplib.connect(this.rabbitMqUrl); const body = { id: `${idHead}-${(new Date()).getTime()}`, task: task, args: [], kwargs, retries: 0, timelimit: [null, null], } // Publisher const routingKey = 'celery'; open.then(function (conn) { return conn.createChannel(); }).then(function (ch) { // return ch.purgeQueue(routingKey); // to purge message in the queue return ch.assertQueue(routingKey) .then(function (ok) { return ch.sendToQueue( routingKey, Buffer.from(JSON.stringify(body)), { contentType: 'application/json', contentEncoding: 'UTF-8', deliveryMode: 1, }); }); }).catch(Logger.error); } } module.exports = MsgIssuer;
python 에서 celery RabbitMQ 로 보내기
import pika import json parameters = pika.URLParameters( "amqp://id:pass@14.14.117.10/%2F" ) connection = pika.BlockingConnection(parameters) ch = connection.channel() ch.queue_declare(queue="celery", durable=True) ch.basic_publish( exchange="celery", routing_key="celery", body=json.dumps( { "id": "myid", "task": "task.tasks.mytask", "args": [], "kwargs": { "mykey1": "123", "mykey2": "456", }, "retries": 0, } ), properties=pika.BasicProperties( content_type="application/json", content_encoding="utf-8" ), )
monitoring
web browser 에서 http://localhost:15672/ 로 접속해서 확인하자.(참고)여기서 queue 에 message 가 몇개 있는지 등도 알 수 있게 message 를 publish 할 수도 있고, 어떤 queue 가 현재 존재하는지 등도 알 수 있다.(Management Plugin — RabbitMQ)
sudo rabbitmq-plugins enable rabbitmq_management
The RabbitMQ management plugin이 제공하는 것
- HTTP-based API
- 관리(management)
- RabbitMQ nodes and clusters 에 대한 monitoring
- browser-based UI
- command line tool, rabbitmqadmin.
rabbitmqadmin
rabbitmqadmin 은 .py 이다. 그래서 python rabbitmqadmin 이런식으로 사용하면 된다. rabbitmq 가 설치되면 downlod 는 아래처럼 하면 받을 수 있다.curl http://localhost:15672/cli/rabbitmqadmin > rabbitmqadmin
queue, exchange 확인
python rabbitmqadmin.py list queues vhost name messages
python rabbitmqadmin.py list exchanges
# 그냥 rabbitmqctl 로도 가능하다
sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
purge
rabbitmqadmin purge queue name=name_of_the_queue_to_be_purged
queue 삭제
python3 rabbitmqadmin.py delete queue name='celery'
그 외
다음 같은 것들을 할 수 있다.- Get a list of exchanges
- Get a list of queues, with some columns specified
- Get a list of queues, with all the detail we can take
- Connect to another host as another user
- Declare an exchange
- Declare a queue, with optional parameters
- Publish a message
- And get it back
- Export Configuration (Definitions)
- Import Configuration (Definitions), quietly
- Close all connections
See Also
- php 에서 Celery 사용하기
- 쿠...sal: [컴] RabbitMQ 자료들
- rabbitmqctl - Is it possible to view RabbitMQ message contents directly from the command line? - Stack Overflow
References
- GitHub - squaremo/amqp.node: AMQP 0-9-1 library and client for Node.JS
- RabbitMQ tutorial - "Hello World!" — RabbitMQ
- javascript - RabbitMQ Node JS Validate User ID - Stack Overflow, 2020-01-08
댓글 없음:
댓글 쓰기