publish message / 메시지 보내기 / publishing messages / celery 사용 / 메세지 / rabbitmq monitor
RabbitMQ 로 celery메시지 보내기
NodeJS에서 celery RabbitMQ 로 메시지 보내기
아래는 celery 가 사용하는 rabbitMQ 로 보낼때 사용하는 code 이다.
celery 는 기본적으로 queue 이름을 celery 로 exchange 를 celery 로 만들고, routingKey 는 없다.(empty)
'use strict';
const amqplib = require('amqplib');
const Logger = use('Logger');
class MsgIssuer {
constructor() {
// for test
this.rabbitMqUrl = 'amqp://userid:userpw@192.168.10.136//';
}
issueTask() {
this._issuing('tasks.issueTask', {}, 'cbill0');
}
_issuing(task, kwargs, idHead) {
const open = amqplib.connect(this.rabbitMqUrl);
const body = {
id: `${idHead}-${(new Date()).getTime()}`,
task: task,
args: [],
kwargs,
retries: 0,
timelimit: [null, null],
}
// Publisher
const routingKey = 'celery';
open.then(function (conn) {
return conn.createChannel();
}).then(function (ch) {
// return ch.purgeQueue(routingKey); // to purge message in the queue
return ch.assertQueue(routingKey)
.then(function (ok) {
return ch.sendToQueue(
routingKey,
Buffer.from(JSON.stringify(body)),
{
contentType: 'application/json',
contentEncoding: 'UTF-8',
deliveryMode: 1,
});
});
}).catch(Logger.error);
}
}
module.exports = MsgIssuer;
python 에서 celery RabbitMQ 로 보내기
import pika
import json
parameters = pika.URLParameters(
"amqp://id:pass@14.14.117.10/%2F"
)
connection = pika.BlockingConnection(parameters)
ch = connection.channel()
ch.queue_declare(queue="celery", durable=True)
ch.basic_publish(
exchange="celery",
routing_key="celery",
body=json.dumps(
{
"id": "myid",
"task": "task.tasks.mytask",
"args": [],
"kwargs": {
"mykey1": "123",
"mykey2": "456",
},
"retries": 0,
}
),
properties=pika.BasicProperties(
content_type="application/json", content_encoding="utf-8"
),
)
monitoring
web browser 에서 http://localhost:15672/ 로 접속해서 확인하자.(
참고)
여기서 queue 에 message 가 몇개 있는지 등도 알 수 있게 message 를 publish 할 수도 있고, 어떤 queue 가 현재 존재하는지 등도 알 수 있다.(
Management Plugin — RabbitMQ)
sudo rabbitmq-plugins enable rabbitmq_management
The RabbitMQ management plugin이 제공하는 것
- HTTP-based API
- 관리(management)
- RabbitMQ nodes and clusters 에 대한 monitoring
- browser-based UI
- command line tool, rabbitmqadmin.
rabbitmqadmin 은 .py 이다. 그래서 python rabbitmqadmin 이런식으로 사용하면 된다. rabbitmq 가 설치되면 downlod 는 아래처럼 하면 받을 수 있다.
curl http://localhost:15672/cli/rabbitmqadmin > rabbitmqadmin
queue, exchange 확인
python rabbitmqadmin.py list queues vhost name messages
python rabbitmqadmin.py list exchanges
# 그냥 rabbitmqctl 로도 가능하다
sudo rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
purge
rabbitmqadmin purge queue name=name_of_the_queue_to_be_purged
queue 삭제
python3 rabbitmqadmin.py delete queue name='celery'
그 외
다음 같은 것들을 할 수 있다.
- Get a list of exchanges
- Get a list of queues, with some columns specified
- Get a list of queues, with all the detail we can take
- Connect to another host as another user
- Declare an exchange
- Declare a queue, with optional parameters
- Publish a message
- And get it back
- Export Configuration (Definitions)
- Import Configuration (Definitions), quietly
- Close all connections
See Also
- php 에서 Celery 사용하기
- 쿠...sal: [컴] RabbitMQ 자료들
- rabbitmqctl - Is it possible to view RabbitMQ message contents directly from the command line? - Stack Overflow
References
- GitHub - squaremo/amqp.node: AMQP 0-9-1 library and client for Node.JS
- RabbitMQ tutorial - "Hello World!" — RabbitMQ
- javascript - RabbitMQ Node JS Validate User ID - Stack Overflow, 2020-01-08