tutorial for amqp producer v.1

This commit is contained in:
Артем Синюгин 2024-10-20 19:16:49 +03:00
parent 36f43420e0
commit 6acf8375ce
2 changed files with 105 additions and 0 deletions

8
Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "rabbit_connect_tutorial"
version = "0.1.0"
edition = "2021"
[dependencies]
amqprs = "2.1.0"
tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros"]}

97
src/main.rs Normal file
View File

@ -0,0 +1,97 @@
use amqprs::{
channel::{BasicPublishArguments, Channel, QueueDeclareArguments}, connection::{Connection, OpenConnectionArguments}, BasicProperties
};
use tokio;
#[derive(Default)]
struct RabbitConfig {
ip: String,
port: u16,
user: String,
pass: String,
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let config = RabbitConfig::default();
let connection_with_channel = establish_rabbit_conn(config).await.expect("No connection with channel created");
handler(connection_with_channel, "tasks".to_string()).await;
Ok(())
}
async fn establish_rabbit_conn(config: RabbitConfig) -> core::result::Result<Channel, amqprs::error::Error> {
// Channel - то, что мы будем возвращать в этой функции https://docs.rs/amqprs/2.1.0/amqprs/channel/struct.Channel.html
// Connection. Если поймем, что Channel нецелесообразно возвращать, вернём коннекшн https://docs.rs/amqprs/2.1.0/amqprs/connection/struct.Connection.html
// QueueDeclareArguments также нам потребуется https://docs.rs/amqprs/2.1.0/amqprs/channel/struct.QueueDeclareArguments.html
// Почему может понадобиться использовать Connection. Потому что иногда для каждого пользователя создаётся отдельный канал (типа каналы они лёгкие и для примерно для этого созданы)
// Но единственный аргумент, чтобы не использовать один канал для всех, что я нашёл - в некоторых приложениях каналы не потокобезопасны.
// Если Mutex c Arc обеспечат нам потокобезопасность, можно попробовать использовать один канал на всех.
// Коннектимся к серверу RabbitMQ
let connection = Connection::open(&OpenConnectionArguments::new(
&config.ip,
config.port,
&config.user,
&config.pass,
))
.await?;
// Строчка ниже отвечает за какое-то внутреннее логирование, чтобы в случае ошибки было удобно отследить причину.
// Но как эта ошибка будет выводится в лог actix web - хз. Я бы по этой причине лучше бы использовал крейт log
// connection.register_callback(DefaultConnectionCallback).await.unwrap();
// открываем канал с идентификатором channel_id. Далее проще сослаться на документацию:
// channel_id range: 1 to 65535.
// Automatically generate an id if input channel_id = None, otherwise, use the given input id.
connection.open_channel(None).await
// ещё далее тут может быть строчка для отслеживания ошибок. Так как в actix web пока не используем, комментируем
// channel.register_callback(DefaultChannelCallback).await.unwrap();
}
async fn handler(channel: Channel, queue_name: String) {
// строчкой c queue_declare убеждаемся, что очередь с queue_name существует, но перед этим
// сообщаем ряд параметров через q_args, в чём именно мы хотим убедиться https://www.rabbitmq.com/amqp-0-9-1-reference#queue.declare
let q_args = QueueDeclareArguments::default()
// сообщаем, что мы хотим публиковать в очередь tasks. Если не обозначили, то в каких-то случаях Rabbit сам случайно назначает на имя очереди
.queue(queue_name)
// через passive мы хотим удостоверится, что очередь существует на сервере amqp, иначе возвращается ошибка
.passive(true)
// durable очередь обозначает, что очередь по-прежнему будет существовать после рестарта сервера amqp
.durable(true)
.finish();
let (queue_name, _, _) = channel.queue_declare(q_args).await.unwrap().unwrap();
// здесь может быть блок настройки exchange. Очень важная штука, но там есть настройки по умолчанию, пока с ними, чтобы не утяжелять логику.
// пока будем использовать дефолтный exchange с пустой строкой "" https://docs.rs/amqprs/2.1.0/amqprs/channel/struct.QueueBindArguments.html
// переходим к отправке. Здесь в BasicPublishArguments указываем, какой exchage с какими параметрами используем, а также BasicProperties с указанными свойствами
// самих сообщений. BasicPublishArguments https://docs.rs/amqprs/2.1.0/amqprs/channel/struct.BasicPublishArguments.html#method.mandatory
// здесь также желательноmandatory=true, чтобы брокер вернул сообщение обратно, если не найдёт подходящую очередь, но так как мы используем дефолтный exchange,
// то оставляем лишь базу
let exchange_args = BasicPublishArguments::new("", &queue_name)
// .mandatory(true)
;
// BasicProperties https://docs.rs/amqprs/2.1.0/amqprs/struct.BasicProperties.html
let message_props = BasicProperties::default()
// сообщения будут записываться на жёсткий диск и при перезагрузке брокера не будут теряться
.with_persistence(true)
.finish();
// мы готовы отправлять сообщение. подготавливаем само сообщение.
let content = String::from(
r#"
{
"publisher": "example"
"data": "Hello, amqprs!"
}
"#,
)
.into_bytes();
// отправляем его с созданными настройками
// как я понял, ecли бы у нас стояло mandatory(true), то конструкция ниже вернула бы ответ OK, который можно было бы обработать. Но пока без этого
channel
.basic_publish(message_props, content, exchange_args)
.await
.unwrap();
}