Rabbit MQ Python Version
2020-09-23 本文已影响0人
阿猫阿狗py
product
# encoding=utf-8
import pika
import json
system_exchange = "exchange"
# Create a new instance of PlainCredentials
credentials = pika.PlainCredentials('', '')
# Create a new ConnectionParameters instance
parameters = pika.ConnectionParameters('', , '', credentials)
class MessageQueue(object):
"""
define queue
"""
def __init__(self, channel, queue_name):
self.channel = channel
self.queue_name = queue_name
self.durable = False
def get_message(self):
# Get a single message from the AMQP broker. Returns a sequence with
# the method frame, message properties, and body.
return self.channel.basic_get(self.queue_name)
def message_count(self):
return self.channel.queue_declare(self.queue_name, durable=self.durable).method.message_count
def set_consumer(self, callback, auto_ack=True):
# Sends the AMQP command Basic.Consume to the broker and binds messages
# for the consumer_tag to the consumer callback. If you do not pass in
# a consumer_tag, one will be automatically generated for you. Returns
# the consumer tag.
self.channel.basic_consume(on_message_callback=callback, queue=self.queue_name, auto_ack=auto_ack)
class MessageChannel(object):
def __init__(self):
# Create a new instance of the Connection object
self.connection = pika.BlockingConnection(parameters)
# Create a new channel with the next available channel number or pass
# in a channel number to use. Must be non-zero if you would like to
# specify but it is recommended that you let Pika manage the channel
# numbers
self.channel = self.connection.channel()
# This method creates an exchange if it does not already exist, and if
# the exchange exists, verifies that it is of the correct and expected
# class.
self.channel.exchange_declare(exchange=system_exchange, exchange_type='topic')
# Specify quality of service. This method requests a specific quality
# of service. The QoS can be specified for the current channel or for all
# channels on the connection. The client can request that messages be sent
# in advance so that when the client finishes processing a message, the
# following message is already held locally, rather than needing to be
# sent down the channel. Prefetching gives a performance improvement
self.channel.basic_qos(prefetch_count=1)
def define_queue(self, queue_name, routing_key=None, exclusive=False):
"""
define queue and routing_key
:param queue_name:
:param routing_key:
:param exclusive:
:return:
"""
# Declare queue, create if needed. This method creates or checks a
# queue. When creating a new queue the client can specify various
# properties that control the durability of the queue and its contents,
# and the level of sharing for the queue
self.channel.queue_declare(queue=queue_name, durable=False, exclusive=exclusive)
if routing_key:
# Bind the queue to the specified exchange
self.channel.queue_bind(queue=queue_name, exchange=system_exchange, routing_key=routing_key)
return MessageQueue(self.channel, queue_name)
def publish(self, routing_key, msg):
# Publish to the channel with the given exchange, routing key, and body.
self.channel.basic_publish(body=msg, exchange=system_exchange, routing_key=routing_key)
print(" [x] Sent %s" % msg)
def start_consuming(self):
# Processes I/O events and dispatches timers and `basic_consume`
# callbacks until all consumers are cancelled.
self.channel.start_consuming()
def close(self):
self.channel.close()
self.connection.close()
if __name__ == "__main__":
channel = MessageChannel()
send = 0
while send <= 30:
mq = channel.define_queue("kaola")
message_count = mq.message_count()
if message_count < 10:
channel.publish("", json.dumps({'url': ''}))
send += 1
channel.close()
customer&product
# encoding=utf-8
import pika
import asyncio
import time
from pyppeteer import launch
# credentials = pika.PlainCredentials('', '')
# parameters = pika.ConnectionParameters('', 5672, '', credentials)
# connection = pika.BlockingConnection(parameters)
# channel = connection.channel()
# channel.queue_declare(queue='hello')
from my_daniel import MessageChannel, MessageQueue
print(' [*] Waiting for messages. To exit press CTRL+C')
def open_url(url):
async def get_data(url):
browser = await launch(headless=False, userDataDir='./userdata', args=['--disable-infobars'])
await browser.pages()
page = await browser.newPage()
await page.goto(url)
await page.setViewport({
'width': 1350,
'height': 850
})
frame = page
await frame.evaluate(
'''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
time.sleep(6)
page_source = await page.content()
await page.close()
await browser.close()
return page_source
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(get_data(url))
def callback(ch, method, properties, body):
import json
body = json.loads(body)
print(body['url'])
print(" [x] Received %r" % (body,))
page_source = open_url(body["url"])
channel.publish("harvested", json.dumps({"page_source": page_source}))
# channel.basic_publish(exchange="exchange",routing_key="harvested", body=json.dumps({"harvested": {"title":"nihao","price":1259,"page_source":str(page_source)}}))
if __name__ == '__main__':
channel = MessageChannel()
mq = channel.define_queue("kaola")
mq.set_consumer(callback)
channel.start_consuming()
channel.close()
customer
# encoding:utf-8
from my_daniel import MessageChannel
receive_channel = MessageChannel()
# receive_channel.define_queue('harvested')
mq = receive_channel.define_queue("harvested")
def callback(ch, method, properties, body):
print(body)
# mq.set_consumer(callback)
print(mq.message_count())
receive_channel.start_consuming()
receive_channel.close()