Rabbit MQ Python Version

2020-09-23  本文已影响0人  阿猫阿狗py

product

# encoding=utf-8
import pika
import json

system_exchange = "exchange"

# Create a new instance of PlainCredentials
credentials = pika.PlainCredentials('', '')

# Create a new ConnectionParameters instance
parameters = pika.ConnectionParameters('', , '', credentials)


class MessageQueue(object):
    """
    define queue
    """

    def __init__(self, channel, queue_name):
        self.channel = channel
        self.queue_name = queue_name
        self.durable = False

    def get_message(self):
        # Get a single message from the AMQP broker. Returns a sequence with
        # the method frame, message properties, and body.
        return self.channel.basic_get(self.queue_name)

    def message_count(self):
        return self.channel.queue_declare(self.queue_name, durable=self.durable).method.message_count

    def set_consumer(self, callback, auto_ack=True):
        # Sends the AMQP command Basic.Consume to the broker and binds messages
        # for the consumer_tag to the consumer callback. If you do not pass in
        # a consumer_tag, one will be automatically generated for you. Returns
        # the consumer tag.
        self.channel.basic_consume(on_message_callback=callback, queue=self.queue_name, auto_ack=auto_ack)


class MessageChannel(object):

    def __init__(self):
        # Create a new instance of the Connection object
        self.connection = pika.BlockingConnection(parameters)
        # Create a new channel with the next available channel number or pass
        # in a channel number to use. Must be non-zero if you would like to
        # specify but it is recommended that you let Pika manage the channel
        # numbers
        self.channel = self.connection.channel()
        # This method creates an exchange if it does not already exist, and if
        # the exchange exists, verifies that it is of the correct and expected
        # class.
        self.channel.exchange_declare(exchange=system_exchange, exchange_type='topic')
        # Specify quality of service. This method requests a specific quality
        # of service. The QoS can be specified for the current channel or for all
        # channels on the connection. The client can request that messages be sent
        # in advance so that when the client finishes processing a message, the
        # following message is already held locally, rather than needing to be
        # sent down the channel. Prefetching gives a performance improvement
        self.channel.basic_qos(prefetch_count=1)

    def define_queue(self, queue_name, routing_key=None, exclusive=False):
        """
        define queue and routing_key
        :param queue_name:
        :param routing_key:
        :param exclusive:
        :return:
        """
        # Declare queue, create if needed. This method creates or checks a
        # queue. When creating a new queue the client can specify various
        # properties that control the durability of the queue and its contents,
        # and the level of sharing for the queue
        self.channel.queue_declare(queue=queue_name, durable=False, exclusive=exclusive)
        if routing_key:
            # Bind the queue to the specified exchange
            self.channel.queue_bind(queue=queue_name, exchange=system_exchange, routing_key=routing_key)
        return MessageQueue(self.channel, queue_name)

    def publish(self, routing_key, msg):
        # Publish to the channel with the given exchange, routing key, and body.
        self.channel.basic_publish(body=msg, exchange=system_exchange, routing_key=routing_key)
        print(" [x] Sent %s" % msg)

    def start_consuming(self):
        # Processes I/O events and dispatches timers and `basic_consume`
        # callbacks until all consumers are cancelled.
        self.channel.start_consuming()

    def close(self):
        self.channel.close()
        self.connection.close()


if __name__ == "__main__":
    channel = MessageChannel()
    send = 0
    while send <= 30:
        mq = channel.define_queue("kaola")
        message_count = mq.message_count()
        if message_count < 10:
            channel.publish("", json.dumps({'url': ''}))
            send += 1
    channel.close()



customer&product

# encoding=utf-8
import pika
import asyncio
import time

from pyppeteer import launch

# credentials = pika.PlainCredentials('', '')
# parameters = pika.ConnectionParameters('', 5672, '', credentials)
# connection = pika.BlockingConnection(parameters)
# channel = connection.channel()

# channel.queue_declare(queue='hello')

from my_daniel import MessageChannel, MessageQueue

print(' [*] Waiting for messages. To exit press CTRL+C')


def open_url(url):
    async def get_data(url):
        browser = await launch(headless=False, userDataDir='./userdata', args=['--disable-infobars'])
        await browser.pages()
        page = await browser.newPage()
        await page.goto(url)
        await page.setViewport({
            'width': 1350,
            'height': 850
        })
        frame = page
        await frame.evaluate(
            '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
        time.sleep(6)
        page_source = await page.content()
        await page.close()
        await browser.close()
        return page_source

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    return loop.run_until_complete(get_data(url))


def callback(ch, method, properties, body):
    import json
    body = json.loads(body)
    print(body['url'])
    print(" [x] Received %r" % (body,))
    page_source = open_url(body["url"])
    channel.publish("harvested", json.dumps({"page_source": page_source}))
    # channel.basic_publish(exchange="exchange",routing_key="harvested", body=json.dumps({"harvested": {"title":"nihao","price":1259,"page_source":str(page_source)}}))


if __name__ == '__main__':
    channel = MessageChannel()
    mq = channel.define_queue("kaola")
    mq.set_consumer(callback)
    channel.start_consuming()
    channel.close()

customer

# encoding:utf-8
from my_daniel import MessageChannel
receive_channel = MessageChannel()
# receive_channel.define_queue('harvested')
mq = receive_channel.define_queue("harvested")


def callback(ch, method, properties, body):
    print(body)


# mq.set_consumer(callback)
print(mq.message_count())
receive_channel.start_consuming()
receive_channel.close()
上一篇下一篇

猜你喜欢

热点阅读