Thread-safe Python RabbitMQ Client & Management library

Overview

AMQPStorm

Thread-safe Python RabbitMQ Client & Management library.

Version

Introduction

AMQPStorm is a library designed to be consistent, stable and thread-safe.

  • 100% Test Coverage!
  • Supports Python 2.7 and Python 3.3+.
  • Fully tested against Python Implementations; CPython and PyPy.

Documentation

Additional documentation is available on amqpstorm.io.

Changelog

Version 2.10.4

  • Fixed issue with a forcefully closed channel not sending the appropriate response [#114] - Thanks Bernd Höhl.

Version 2.10.3

  • Fixed install bug with cp1250 encoding on Windows [#112] - Thanks ZygusPatryk.

Version 2.10.2

  • Fixed bad socket fd causing high cpu usage [#110] - Thanks aiden0z.

Version 2.10.1

  • Fixed bug with UriConnection not handling amqps:// properly.
  • Improved documentation.

Version 2.10.0

  • Added Pagination support to Management list calls (e.g. queues list).
  • Added Filtering support to Management list calls.
  • Re-use the requests sessions for Management calls.
  • Updated to use pytest framework instead of nose for testing.

Version 2.9.0

  • Added support for custom Message implementations - Thanks Jay Hogg.
  • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
  • Re-worked the channel re-use code.

Version 2.8.5

  • Fixed a potential deadlock when opening a channel with a broken connection [#97] - Thanks mehdigmira.

Version 2.8.4

  • Fixed a bug in Message.create where it would mutate the properties dict [#92] - Thanks Killerama.

Version 2.8.3

  • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
  • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.

Version 2.8.2

  • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
  • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.

Version 2.8.1

  • Cleaned up documentation.

Version 2.8.0

  • Introduced a new channel function called check_for_exceptions.
  • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskon.
  • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
  • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.

Version 2.7.2

  • Added ability to override client_properties [#77] - Thanks tkram01.

Version 2.7.1

  • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
  • Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0

  • Added support for passing your own ssl context [#71] - Thanks troglas.
  • Improved logging verbosity on connection failures [#72] - Thanks troglas.
  • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.

Version 2.6.2

  • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
  • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.

Version 2.6.1

  • Fixed minor issue with the last channel id not being available.

Version 2.6.0

  • Re-use closed channel ids [#55] - Thanks mikemrm.
  • Changed Poller Timeout to be a constant.
  • Improved Connection Close performance.
  • Channels is now a publicly available variable in Connections.

Version 2.5.0

  • Upgraded pamqp to v2.0.0.
  • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
  • Fixed issue with Management queue/exchange declare when the passive flag was set to True.

Credits

Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

Comments
  • Out of order ack-ing?

    Out of order ack-ing?

    Whoooo, more wierdness!

    .Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 5
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
    Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 6
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 7
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
    Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
    Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Disconnecting!
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
    Running state: True, thread alive: True, thread id:140037440014080
    Joining _inbound_thread. Runstate: %s False
    
    

    Context:

    I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

    It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

    I'm working on pulling out a testable example, but it's certainly odd.


    Incidentally, the new docs pages are fancypants!

    wontfix 
    opened by fake-name 49
  • Add the ability to forcefully close a connection.

    Add the ability to forcefully close a connection.

    Basically, I'm dealing with a context where I have a high volume traffic AMQP connection, across a high-latency, unreliable link (intercontinental).

    If there is a way for a connection to possibly wedge, I'll encounter it.

    Anyways, The issue I ran into here is that it's possible for the "close" operation on a connection to wedge indefinitely, preventing a connection from actually closing (I /think/ if the shutdown RPC request gets garbled/eaten somewhere).

    The question of /how/ this happens aside, I therefore need to be able to kill a open connection in a dirty manner. This adds the ability to kill() a connection which will force the worker thread to exit immediately, without bothering to do any proper cleanup.

    Because this is a operation that's generally done with a additional watcher process, outside of the normal program flow, I used multiprocessing primitives to manage the _die flag (at one point, the watcher was a separate /process/, not thread).

    Anyways, I'm not sure if this is inline with the ideas of the library, but I haven't been able to wedge the amqp connection with this patch set, so that's something.


    Only tested on Py3.5x64, Ubuntu 14.04.

    Things I haven't done: Additional unit tests.

    opened by fake-name 17
  • SSL Retry

    SSL Retry

    I regularly get [SSL: BAD_WRITE_RETRY] bad write retry (_ssl.c:1647) errors when running AMQP-Storm using SSL.

    Looking at the code I think its because it's not handling SSL Retry exceptions from the socket

    http://stackoverflow.com/questions/2997218/why-am-i-getting-error1409f07fssl-routinesssl3-write-pending-bad-write-retr

    i'll get a stack trace to see if its on write or on do_handshake

    bug 
    opened by thejuan 17
  • reached the maximum number of channels raised with closed channels

    reached the maximum number of channels raised with closed channels

    Hello,

    I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

    After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

    I tested with a quick fix

        def _get_next_available_channel_id(self):
            channel_id = len(self._channels) + 1
            active_channels = [
                ch for ch in list(self._channels.values()) if ch and ch.is_open
            ]
            if len(active_channels) >= self.max_allowed_channels:
                raise AMQPConnectionError(
                    'reached the maximum number of channels %d' %
                    self.max_allowed_channels)
            return channel_id
    

    However it may be better to just keep an active count

    bug 
    opened by mikemrm 12
  • Unhandled Frames in 3.6.2

    Unhandled Frames in 3.6.2

    Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.

    [Channel%d] Unhandled Frame: %s -- %s

    No code changes, just a server upgrade. Not sure if this storm or rabbitmq.

    opened by thejuan 12
  • Exception: 'Deliver' object has no attribute 'body_size'

    Exception: 'Deliver' object has no attribute 'body_size'

    I have been experiencing a strange error, just for some messages I got this error:

    'Deliver' object has no attribute 'body_size' channel.py (line: 253)

    I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody

    amqp-storm

    The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.

    bug 
    opened by viniciuschiele 12
  • Cant publish multiple messages

    Cant publish multiple messages

    This is a weird one, I've upgraded to 2.2 and trying to do the following script

    import os
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    from amqpstorm import Message
    from amqpstorm import UriConnection
    
    keys = ["1","2","3"]
    #signer = Signer()
    bus = UriConnection("***")
    with bus.channel(rpc_timeout=10) as channel:
        channel.confirm_deliveries()
        for key in keys:
            print key
            msg = "My Message"
            #properties = {"headers": {"md5-signature": signer.sign(msg)}}
            Message.create(channel, msg, properties).publish(key, exchange="amq.topic")
    
    

    I get this error on the second publish.

    DEBUG:amqpstorm.connection:Connection Opening DEBUG:amqpstorm.channel0:Frame Received: Connection.Start DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started DEBUG:amqpstorm.connection:Connection Opened DEBUG:amqpstorm.connection:Opening a new Channel DEBUG:amqpstorm.connection:Channel #1 Opened 1 2 DEBUG:amqpstorm.channel:Channel #1 Closing DEBUG:amqpstorm.channel:Channel #1 Closed Exception in thread amqpstorm.io: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data if self.poller.is_ready: File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready except select.error as why: AttributeError: 'NoneType' object has no attribute 'error'

    bug 
    opened by thejuan 11
  • Testing with pamqp 3.0.0a4

    Testing with pamqp 3.0.0a4

    I plan on releasing pamqp 3.0 soon and wanted to make sure you were aware of the changes that are making it into 3.0 and give you an opportunity to report any issues prior to its release.

    Please see the version history @ https://pamqp.readthedocs.io/en/latest/history.html

    opened by gmr 10
  • potential infinite loop ?

    potential infinite loop ?

    Hello,

    I am using your library in conjunction with celery and i regularly encounter a problem: When asking celery to restart its worker (emitting a SIGTERM), the process is blocked and celery doesn't want to restart. Celery maintains a parent thread that runs child threads where tasks are executed. Amqpstorm's thread is also run from this parent thread. The processes in htop look like this:

    screenshot from 2017-04-11 10-34-26

    Here process 13880 & 13803 are related to amqpstorm. 13803 is the inbound_thread while 13880 is the heartbeat timer.

    After some investigation, i found out that killing the thread responsible for the heartbeat timer allows celery to gracefuly restart... This lead me to think that the timer could possibly create a deadlock.

    I have created a pull request based on this assumption and will try this branch on my setup: Can you tell me what you think of it ? Do you see any other possible explanations for the error i see ?

    opened by cp2587 10
  • Connection was closed by remote server: CONNECTION_FORCED

    Connection was closed by remote server: CONNECTION_FORCED

    Hello,

    We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

        self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
        self._channel.write_frames(frames_out)
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
        self.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
        self._connection.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
        raise self.exceptions[0]
    AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
    

    On the rabbitmq server, we have the following logs: client unexpectedly closed TCP connection

    Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

    class AMQPStormSocket(object):
    
        def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                     queue_is_durable, exchange_type, fallback_call):
    
            # create connection & channel
            self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
            self.channel = self.connection.channel()
    
            # create an exchange, if needed
            self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
            # create a queue, if needed
            self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
            # bind it
            self.channel.queue.bind(queue=queue, exchange=exchange)
    
            # needed when publishing
            self.exchange = exchange
    
            self.fallback_call = fallback_call
    
        def sendall(self, data):
            try:
                self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
            except Exception as e:
                self.fallback_call(e)
    
        def close(self):
            try:
                self.channel.close()
                self.connection.close()
            except Exception:
                pass
    

    Do you have an idea on how to fix these errors ?

    opened by cp2587 10
  • Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get

    mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue

    If I try to declare a queue without specifying any params I get

    Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'

    The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(

    bug 
    opened by blackalegator 9
  • Is there any support for AMQP 1-0-0 ?

    Is there any support for AMQP 1-0-0 ?

    Does the module supports AMQP-1-0-0.

    I do not see anything regarding supported version in the Doc.

    If not, is there any plan to support it in the future?

    opened by ruffp 1
  • Installation errors with Python 3.10

    Installation errors with Python 3.10

    I am using Python3 version 3.10.4 on Ubuntu 18.04 LTS and encountering the following error when attempting to install amqpstorm:

    $ pip install amqpstorm
    Keyring is skipped due to an exception: module 'collections' has no attribute 'MutableMapping'
    Defaulting to user installation because normal site-packages is not writeable
    Collecting amqpstorm
      Using cached AMQPStorm-2.10.4.tar.gz (71 kB)
      Preparing metadata (setup.py) ... error
      error: subprocess-exited-with-error
    
      × python setup.py egg_info did not run successfully.
      │ exit code: 1
      ╰─> [20 lines of output]
          Traceback (most recent call last):
            File "<string>", line 2, in <module>
            File "<pip-setuptools-caller>", line 14, in <module>
            File "/usr/lib/python3/dist-packages/setuptools/__init__.py", line 12, in <module>
              import setuptools.version
            File "/usr/lib/python3/dist-packages/setuptools/version.py", line 1, in <module>
              import pkg_resources
            File "/usr/lib/python3/dist-packages/pkg_resources/__init__.py", line 77, in <module>
              __import__('pkg_resources.extern.packaging.requirements')
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/packaging/requirements.py", line 9, in <module>
              from pkg_resources.extern.pyparsing import stringStart, stringEnd, originalTextFor, ParseException
            File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
            File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
            File "<frozen importlib._bootstrap>", line 672, in _load_unlocked
            File "<frozen importlib._bootstrap>", line 632, in _load_backward_compatible
            File "/usr/lib/python3/dist-packages/pkg_resources/extern/__init__.py", line 43, in load_module
              __import__(extant)
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/pyparsing.py", line 943, in <module>
              collections.MutableMapping.register(ParseResults)
          AttributeError: module 'collections' has no attribute 'MutableMapping'
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: metadata-generation-failed
    
    × Encountered error while generating package metadata.
    ╰─> See above for output.
    
    note: This is an issue with the package mentioned above, not pip.
    hint: See above for details.
    

    I have seen recommendations to upgrade to the latest version of requests when this happens, but I already have the latest version of requests installed, here is the contents of my requirements.txt file:

    boto3==1.23.0
    botocore==1.26.0
    jmespath==1.0.0
    python-dateutil==2.8.2
    s3transfer==0.5.2
    six==1.16.0
    urllib3==1.26.9
    redis==4.3.1
    requests==2.27.1
    amqpstorm==2.10.4
    pyyaml==6.0
    
    opened by ashleykleynhans 3
  • How does amqpstorm handle Rabbitmq memory alarms

    How does amqpstorm handle Rabbitmq memory alarms

    Hello,

    Thanks for your work on this library 👍 I have a question regarding Rabbitmq memory watermarks and the way amqpstorm handles them.

    Imagine the following:

    1. I have a process that keeps publishing messages (basically an infinite loop for this example)
    2. At a certain point memory alarm is triggered on RMQ, and further publishes are blocked (Writes on a blocked connection will time out or fail with an I/O write exception.), also the publisher get notified that a connexion is blocked (Compatible AMQP 0-9-1 clients will be [notified] when they are blocked and unblocked.).
    3. Then Rabbitmq flushes out some of the data to disk, and unblocks publisher

    What happens on AMQPstorm side ? Does it raise ? do we lose some messages ? does the block/unblock logic trigger some logic in AMQPStorm ?

    Thanks for the help

    Links: RMQ doc

    opened by mehdigmira 4
  • Asyncronous delivery confirmation

    Asyncronous delivery confirmation

    Hi,

    First, thanks for the work.

    We are experiencing some slow-down while enqueuing message with confirm-delivery. Some time before we were not using this feature and enqueuing would be quite fast, and after enabling this feature, enqueuing would become quite slower.

    I checked and it seems that the lib wait for the confirmation on each message while on the following blog post (https://blog.rabbitmq.com/posts/2011/02/introducing-publisher-confirms) they seems to tell us to enqueue all the message and then wait for all the confirmations, do you think there would be a way for amqpstorm to handle that ? If you have some indications, I could try an implementation.

    Thank you.

    enhancement 
    opened by antoinerabany 4
  • Robust Connection example

    Robust Connection example

    Hi,

    This is more of a question than an issue - is it possible to be able to manage a long-lived connection such that a reconnection can be immediately attempted if the channel/connection is closed?

    I see the robust consumer example, but nothing for a connection that maybe used just for publishing. I don't see the ability to add any callbacks for a closed connection or a way to block the current thread until there's an error on the connection/channel, or maybe I'm just missing something.

    Thanks for any help in advance.

    opened by jmcarter 4
Releases(2.10.5)
  • 2.10.5(Aug 14, 2022)

    • Added support for bulk removing users with the Management Api.
    • Added support to get the Cluster Name using the Management Api.
    • Fixed ConnectionUri to default to port 5761 when using ssl [#119] - Thanks s-at-ik.
    Source code(tar.gz)
    Source code(zip)
  • 2.10.4(Nov 20, 2021)

  • 2.10.3(Nov 4, 2021)

  • 2.10.2(Oct 22, 2021)

  • 2.10.1(Sep 29, 2021)

  • 2.10.0(Sep 7, 2021)

    • Added Pagination support to Management list calls (e.g. queues list).
    • Added Filtering support to Management list calls.
    • Re-use the requests sessions for Management calls.
    • Updated to use pytest framework instead of nose for testing.
    Source code(tar.gz)
    Source code(zip)
  • 2.9.0(Jun 11, 2021)

    • Added support for custom Message implementations - Thanks Jay Hogg.
    • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
    • Re-worked the channel re-use code.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.5(May 26, 2021)

  • 2.8.4(Mar 16, 2021)

  • 2.8.3(Mar 16, 2021)

    • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
    • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.2(Oct 8, 2020)

    • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
    • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.1(Jun 27, 2020)

  • 2.8.0(Jun 9, 2020)

    • Introduced a new channel function called check_for_exceptions.
    • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskoň.
    • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
    • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.2(Dec 2, 2019)

  • 2.7.1(Jun 16, 2019)

    • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
    • Fixed an issue with closing Channels taking too long after the server initiated it.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.0(Apr 13, 2019)

    • Added support for passing your own ssl context [#71] - Thanks troglas.
    • Improved logging verbosity on connection failures [#72] - Thanks troglas.
    • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.2(Feb 2, 2019)

    • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
    • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.1(Dec 28, 2018)

  • 2.6.0(Dec 28, 2018)

    • Re-use closed channel ids [#55] - Thanks mikemrm.
    • Changed Poller Timeout to be a constant.
    • Improved Connection Close performance.
    • Channels is now a publicly available variable in Connections.
    Source code(tar.gz)
    Source code(zip)
  • 2.5.0(Nov 25, 2018)

    • Upgraded pamqp to v2.0.0.
      • Python 3 keys will now always be of type str.
      • For more information see https://pamqp.readthedocs.io/en/latest/history.html
    • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
    • Fixed issue with Management queue/exchange declare when the passive flag was set to True.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.2(Sep 1, 2018)

    • Added support for External Authentication - Thanks Bernd Höhl.
    • Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
    • LICENSE file now included in package - Thanks Tomáš Chvátal.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.1(Aug 29, 2018)

    • Added client/server negotiation to better determine the maximum supported channels and maximum allowed frame size [#52] - Thanks gastlich.
    • We now raise an exception if the maximum allowed channel count is reached.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.0(Jan 17, 2018)

  • 2.3.0(Nov 8, 2017)

    • Added delivery_tag property to message.
    • Added redelivered property to message [#41] - Thanks tkram01.
    • Added support for Management Api Healthchecks [#39] - Thanks Julien Carpentier.
    • Fixed incompatibility with Sun Solaris 10 [#46] - Thanks Giuliox.
    • Fixed delivery_tag being set to None by default [#47] - tkram01.
    • Exposed requests verify and certs flags to Management Api [#40] - Thanks Julien Carpentier.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.2(Apr 23, 2017)

  • 2.2.1(Feb 22, 2017)

    • Fixed potential Channel leak [#36] - Thanks Adam Mills.
    • Fixed threading losing select module during python shutdown [#37] - Thanks Adam Mills.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.0(Nov 18, 2016)

    • Connection.close should now be more responsive.
    • Channels are now reset when re-opening an existing connection.
    • Re-wrote large portions of the Test suit.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.4(Nov 3, 2016)

    • Added parameter to override auto-decode on incoming Messages - Thanks Travis Griggs.
    • Fixed a rare bug that could cause the consumer to get stuck if the connection unexpectedly dies - Thanks Connor Wolf.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.3(Sep 29, 2016)

  • 2.1.2(Sep 23, 2016)

Owner
Erik Olof Gunnar Andersson
The views expressed here are 100% mine and in no way reflect those of my employer.
Erik Olof Gunnar Andersson
A simple MTProto-based bot that can download various types of media (>10MB) on a local storage

TG Media Downloader Bot 🤖 A telegram bot based on Pyrogram that downloads on a local storage the following media files: animation, audio, document, p

Alessio Tudisco 11 Nov 01, 2022
Extrait les informations contenues dans le code QR de la preuve de vaccination générée par le gouvernement du Québec

DecodeurPreuveVaccinationQC Extrait les informations contenues dans le code QR de la preuve de vaccination générée par le gouvernement du Québec Utili

Guillaume Morissette 8 Jul 26, 2022
Minimal telegram voice chat music bot, in pyrogram.

VCBOT Fully working VC (user)Bot, based on py-tgcalls and py-tgcalls-wrapper with minimal features. Deploying To heroku: Local machine/VPS: git clone

Aditya 33 Nov 12, 2022
a discord bot that pulls the latest or most relevant research papers from arxiv.org

AI arxiver a discord bot that pulls the latest or most relevant research papers from arxiv.org invite link : Arxiver bot link works in progress Usage

Ensias AI 14 Sep 03, 2022
Ma2tl - macOS forensic timeline generator using the analysis result DBs of mac apt

ma2tl (mac_apt to timeline) This is a DFIR tool for generating a macOS forensic

Minoru Kobayashi 66 Nov 18, 2022
A (probably) working Kik name checker

KikNameChecker !THIS ONLY CHECKS WS2.KIK.COM ENDPOINT! \ Will add user inputted endpoints thing \ A (probably) working Kik name checker Started as a s

insert edgy and cool name 1 Dec 17, 2022
Ice-Userbot adalah userbot Telegram modular yang berjalan di Python3 dengan database sqlalchemy

Ice-Userbot Telegram Ice-Userbot adalah userbot Telegram modular yang berjalan di Python3 dengan database sqlalchemy. Berbasis Paperplane dan ProjectB

6 Apr 29, 2022
Authenticate your League of legends account on riot client in a few lines of code.

lol-authenticator v1.0.0 Content index Project Setup Dependencies Project Setup Dependencies Python v3.9.6 If you don't have Python installed on your

Cássio Fontoura 5 Aug 28, 2022
칼만 필터는 어렵지 않아(저자 김성필) 파이썬 코드(Unofficial)

KalmanFilter_Python 칼만 필터는 어렵지 않아(저자 김성필) 책을 공부하면서, Matlab 코드를 Python으로 변환한 것입니다. Contents Part01. Recursive Filter Chapter01. Average Filter Chapter0

Donghun Park 20 Oct 28, 2022
Repository to access information of stocks in Bombay Stock Exchange.

BSE Repository to access information of stocks in Bombay Stock Exchange. The code in this repository uses BSE API and conclusions made using the code

1 Nov 13, 2021
search different Streaming Platforms for movie titles.

Install git clone and cd to directory install Selenium download chromedriver.exe to same directory First Run Use --setup True for the first run. Platf

34 Dec 25, 2022
A telegram mirror bot with an integrated RSS feed reader.

About What is this repo? This is a slightly modified fork which includes some extra features & memes added to my liking. How's this different from the

11 May 15, 2022
This is a translator that i made by myself in python with the 'googletrans' library

Translator-Python This is a translator that i made by myself in python with the 'googletrans' library This application completely made in python allow

Thadeuks 2 Jun 17, 2022
A simple Python TDLib wrapper

Telegram Forwarder App Description pywtdlib (Python Wrapper TDLib) is a simple synchronous Python wrapper that makes you easy to create new Python Tel

Álvaro Fernández 2 Jan 04, 2023
Calendars for various securities exchanges.

IMPORTANT NOTE This package is currently unmaintained as the sponsor, quantopian, is going through corporate changes. As such there is a fork of this

Quantopian, Inc. 545 Jan 07, 2023
This bot is created by AJTimePyro and It accepts direct downloading url & then return file as telegram file.

URL Uploader Bot This is the source code of URL Uploader Bot. And the developer of this bot is AJTimePyro, His Telegram Channel & Group. You can use t

Abhijeet 23 Nov 13, 2022
RevSpotify is a fast, useful telegram bot to have Spotify music on Telegram.

RevSpotify A Telegram Bot that can download music from Spotify RevSpotify is a fast, useful telegram bot to have Spotify music on Telegram. ✨ Features

Alireza Shabani 12 Sep 12, 2022
This is a DCA crypto trading bot built for Binance written in Python

This is a DCA crypto trading bot built for Binance written in Python. It works by allowing you to DCA at an interval of your choosing and reports back on your average buy price as well as a chart con

Andrei 55 Oct 17, 2022
This is a simple code for discord bot !

Discord bot dice roller this is a simple code for discord bot it can roll 1d4, 1d6, 1d8, 1d10, 1d12, 1d20, 1d100 for you in your discord server. Actua

Mostafa Koolabadi 0 Jan 02, 2022
Presentation and code files for the talk at PyCon Indonesia

pycon-indonesia Presentation and code files for the talk at PyCon Indonesia. Files used for the PyCon Indonesia presentation. [Directory Includes:] Be

Neeraj Pandey 2 Dec 04, 2021