Python基础语法-3

2019-07-24  本文已影响0人  xiaohan_zhang

创建一个TCP socket

# 创建socket套接字
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# ...使用套接字的功能...
print("tcp_socket------")

# 不使用时,关闭套接字
tcp_socket.close()

创建一个UDP socket

# 发送数据
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# ...使用套接字的功能...
udp_socket.sendto(('哈哈哈哈哈哈'.encode('utf-8')), ("192.168.1.105", 8080))

print("udp_socket------")

# 不使用时,关闭套接字
udp_socket.close()

# 接收数据
# 创建一个UDP socket
udp_recv_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定一个本地信息
localaddr = ("", 7788)
udp_recv_socket.bind(localaddr)

# 模拟发送数据
udp_sender_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_sender_socket.sendto(('哈哈哈哈哈哈'.encode('utf-8')), ("192.168.1.105", 7788))
udp_sender_socket.close()

# 接收数据
recv_data = udp_recv_socket.recvfrom(1024)
recv_msg = str(recv_data[0], 'utf-8')
sender_addr = recv_data[1]
print('msg = %s addr = %s' % (recv_msg, sender_addr))
udp_recv_socket.close()
import threading
import time


def sing():
    """ 唱歌5s """
    for i in range(5):
        print("------正在唱歌-------")
        print('thread %s is running...' % threading.current_thread().name)
        time.sleep(1)


def dance():
    """ 跳舞5s """
    for i in range(5):
        print("---------正在跳舞-------")
        print('thread %s is running...' % threading.current_thread().name)
        time.sleep(1)


class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            print("I'm " + self.name + ' @ ' + str(i))


def test1(temp):
    temp.append(33)
    print("-------- in test1 temp = %s" % str(temp))


def test2(temp):
    print("-------- in test2 temp = %s" % str(temp))


gl_num = 0
# 创建互斥锁 默认是没有上锁的
mutex = threading.Lock()


def test3(count):
    global gl_num
    # 上锁
    # 如果之前没有被上锁,则上锁成功
    # 如果已经被上锁,则会堵塞在这里,直到锁被解开
    mutex.acquire()

    for i in range(count):
        gl_num += 1

    # 解锁
    mutex.release()
    print("-------in test3 gl_num = %d" % gl_num)


def test4(count):
    global gl_num
    mutex.acquire()
    for i in range(count):
        gl_num += 1
    mutex.release()
    print("-------in test4 gl_num = %d" % gl_num)


def main():
    print("---- 开始 ----:%s" % time.ctime())
    # 方式一
    t1 = threading.Thread(target=sing)  # 没有()
    t2 = threading.Thread(target=dance)
    t1.start()
    t2.start()

    # 方式二
    t3 = MyThread()
    t3.start()

    # target指定这个线程执行哪个函数
    # args指定 调用函数时,传递什么参数
    t4 = threading.Thread(target=test3, args=(100000,))
    t5 = threading.Thread(target=test4, args=(100000,))
    t4.start()
    t5.start()

    print("------- in main thread g_nums = %s" % str(gl_num))

    while True:
        length = len(threading.enumerate())
        print("当前运行的线程数为: %d", length)

        if length <= 1:
            break
        time.sleep(0.5)


if __name__ == "__main__":
    main()
import multiprocessing
import time


def sing():
    """ 唱歌5s """
    for i in range(5):
        print("------正在唱歌-------")
        print('thread %s is running...' % multiprocessing.current_process().name)
        time.sleep(1)


def dance():
    """ 跳舞5s """
    for i in range(5):
        print("---------正在跳舞-------")
        print('thread %s is running...' % multiprocessing.current_process().name)
        time.sleep(1)


def main():
    p1 = multiprocessing.Process(target=sing)
    p2 = multiprocessing.Process(target=dance)
    p1.start()
    p2.start()


if __name__ == "__main__":
    main()
from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (msg, os.getpid()))
    # random.random()随机生成0-1之间的浮点数
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print("%s执行完毕,耗时%0.2f" % (msg, t_stop - t_start))


po = Pool(3)
for i in range(0, 10):
    # Pool().apply_async(要调用的目标,(传递给目标的参数元组,))
    # 每次循环将用空闲出来的子线程来调用目标
    po.apply_async(worker, (i, ))

print("---------- start -----------")
# 关闭进程池,关闭后,po不再接收新的请求
po.close()
# 等待po中所有子进程执行完成,必须放在close()语句之后
po.join()
print("---------- end -------------")


def main():
    pass


if __name__ == "__main__":
    main()
from collections import Iterable
from collections import Iterator


class Classmate(object):

    def __init__(self):
        self.names = list()
        self.current_num = 0

    def add(self, name):
        self.names.append(name)

    # def __iter__(self):
    #     """ 如果想要一个对象称为 一个可迭代的对象,即可以使用for,那必须实现__iter__方法"""
    #     return ClassInterter(self)
    def __iter__(self):

        return self

    def __next__(self):
        if self.current_num < len(self.names):
            ret = self.names[self.current_num]
            self.current_num += 1
            return ret
        else:
            raise StopIteration # 抛出该异常,for循环自动停止


classmate = Classmate()
classmate.add("Luffy")
classmate.add("Naruto")

print("判断classmeta是否是可以迭代的对象", isinstance(classmate, Iterable))
classmate_iterator = iter(classmate)
print("判断classmate_iterator是否是迭代器", isinstance(classmate_iterator, Iterator))
# print(next(classmate_iterator))


for name in classmate:
    print(name)
# 创建列表时用()
list1 = []
list2 = ()

创建生成器方式二

def create_num(all_num):
    a, b = 0, 1
    current_num = 0
    while current_num < all_num:
        # print(a)
        yield a
        a, b = b, a + b
        current_num += 1


# 如果在调用create_num时,发现这个函数中有yield,那么此时不是调用函数,而是创建一个生成器对象
obj = create_num(10)
for num in obj:
    print(num)
import time


def tash_1():
    while True:
        print("---- 1 -----")
        time.sleep(0.1)
        yield


def tash_2():
    while True:
        print("---- 2 -----")
        time.sleep(0.1)
        yield


def main():
    t1 = tash_1()
    t2 = tash_2()
    while True:
        next(t1)
        next(t2)


if __name__ == "__main__":
    main()
def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        # gevent.sleep(0.5)
        time.sleep(0.5)


def test1():
    # 有耗时操作时需要
    monkey.patch_all()  # 将程序中有耗时操作的代码,换为gevent中的自己实现的模块

    g1 = gevent.spawn(f, 5)
    g2 = gevent.spawn(f, 5)
    g3 = gevent.spawn(f, 5)

    g1.join()
    g2.join()
    g3.join()


def coroutine_work(coroutine_name):
    for i in range(5):
        print(coroutine_name, i)
        time.sleep(1)


def test2():
    monkey.patch_all()

    gevent.joinall([
        gevent.spawn(coroutine_work, "work1"),
        gevent.spawn(coroutine_work, "work2")
    ])


def main():
    # test1()
    test2()


if __name__ == "__main__":
    main()
上一篇下一篇

猜你喜欢

热点阅读