Module io.asyn.amqp

Asyncronous AMQP/RabbitMQ driver.

Example app using async.amqp module:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 # Naive use case, demo only

 from fairways.taskflow import Chain
 from fairways.funcflow import FuncFlow as ff
 from fairways.decorators import (connection, entrypoint, use)
 from fairways.io.asyn import amqp
 from fairways.io.asyn.base import run_asyn

 def fetch_message(raw):
     print(f"Step1 ctx: {type(raw)} | {raw.body}")
     message = dict(
         body=raw.body,
         headers=raw.headers
     )
     return message

 def transform_message(message):
     print(f'Step2 ctx: {type(message)} | {message["body"]}')
     body = str(message["body"])
     new_body = f'*** {body} ***'
     return ff.weld(message, dict(body=new_body))

 # Result of this function will be automatically
 # published to selected exchange.
 # Do not forget to run publisher loop later
 # with `amqp.producer.create_tasks_future`.
 @amqp.producer(exchange="fws-out")
 def relay_message(message):
     print(f'Step3 ctx: {type(message)} | {message["body"]}')
     return message

 def check_pub_result(result):
     print(f"Step after publish: {type(result)}")
     return result

 def handle_error(err_info):
     failure = err_info
     print("ERROR: Something totally wrong!", str(failure)[:1000])
     return {}

 chain = Chain("AMQP transformer"
     ).then(
         fetch_message
     ).then(
         transform_message
     ).then(
         relay_message
     ).then(
         check_pub_result
     ).catch(
         handle_error
     )

 # Mark function as consumer handler.
 # Consider it as a main entrypoint in this app.
 # Run it later with `amqp.consumer.create_tasks_future`
 @amqp.consumer(queue="fairways")
 def run(message):
     # We starting our lazy chain
     # each time when incoming message occurs
     return chain(message)

 if __name__ == '__main__':
     run_asyn([
         # Here is we firing loop for all consumers in app:
         amqp.consumer.create_tasks_future(),
         # Here is we firing loop for all producers in app:
         amqp.producer.create_tasks_future()
     ])

Module content