In the actual processing of data, due to the limited system memory, we cannot export all the data at once for operation, so we need to batch export in sequence. In order to speed up the operation, we will use a multi-threaded approach to data processing, the following is my summary of the template for multi-threaded batch data processing.
import threading
# Class to extract data from database
class Scheduler():
def __init__(self):
self._lock = threading.RLock()
self.start = 0
# fetch 10000 data at a time
self.step = 10000
def getdata(self):
# lock to prevent multiple threads from accessing the database at the same time and taking out duplicate data
self._lock.acquire()
# Do the fetch operation
data = ‘select * from table’ \
‘where id between self.start and self.start + self.step’
# After fetching data, the pointer is moved back
self.start += self.step
self._lock.release()
return data
# The process of processing data is written here
def processdata():
# Extract data from this instance
data = scheduler.getdata()
while data:
# Process the data:
# de-duplicate, fill in gaps, do arithmetic… This thread continues to fetch new data as long as there is still data
# Then fetch data again, and loop
data = scheduler.getdata()
# Create multiple threads, threads_num is the number of threads created
def threads_scheduler(threads_num):
threads = []
for i in range(threads_num):
# Create threads
td = threading.Thread(target=processdata, name=’th’+str(i+1))
threads.append(td)
for t in threads:
# Start the thread
t.start()
for t in threads:
# Sub-thread guarding
t.join()
print(‘All data has been processed successfully’)
if __name__==’__main__’:
# Instantiate a scheduler, initialize the parameters
scheduler = Scheduler()
# Create a thread and start processing data
threads_scheduler(4)
There are three main parts.
The Scheduler class, which is responsible for initializing the parameters, and the getdata method, which is responsible for extracting the data
The processdata method writes the process of processing data
threads_scheduler method is responsible for creating threads
Python multi-threading knowledge I divided into four parts to explain, the following take you to review the key.
Multithreadingthreading
This chapter first introduces you to the concept of threads:
Main Thread: When a program starts, a process is created by the operating system (OS), and a thread is run immediately, usually called the Main Thread of the program. Because it is executed at the beginning of the program, if you need to create another thread, then the thread created is a child of this main thread.
Sub-threads: The linearity created using threading, ThreadPoolExecutor are all sub-threads.
The importance of the main thread is reflected in two aspects: 1. it is the thread that generates other sub-threads; 2. usually it must finish the execution last, such as performing various shutdown actions.
Without multithreading, we would not be able to listen to a song and play the game at the same time; with multithreading, we can listen to the background music while playing the game. In this example, starting the car program is one process, and playing the game and listening to the music are two threads.
Python provides the threading module to implement multithreading: threading.
from time import sleep
import threading
def music(music_name):
for i in range(2):
print(‘listening to {}’.format(music_name))
sleep(1)
print(‘music over’)
def game(game_name):
for i in range(2):
print(‘playing {}’.format(game_name))
sleep(3)
print(‘game over’)
threads = []
t1 = threading.Thread(target=music,args=(‘inaka’,))
threads.append(t1)
t2 = threading.Thread(target=game,args=(‘flying car’,))
threads.append(t2)
if __name__ == ‘__main__’:
for t in threads:
# t.setDaemon(True)
t.start()
for t in threads:
t.join()
print(‘Main thread finished running’)
Thread pooling
Since the system needs to allocate resources for new threads and reclaim resources for terminating threads, if you can reuse threads, you can reduce the new/terminating overhead to improve performance. Also, the syntax for using thread pools is more concise than creating your own threads to execute them.
Python provides us with ThreadPoolExecutor to implement thread pools, which are guarded by default subthreads. It is suitable for scenarios where there is a large number of sudden requests or where a large number of threads are needed to complete a task, but the actual task processing time is short.
from time import sleep
# fun is the function defined to be run
with ThreadPoolExecutor(max_workers=5) as executor:
ans = executor.map(fun, [traversal value])
for res in ans:
print(res)
with ThreadPoolExecutor(max_workers=5) as executor:
list = [traversal values]
ans = [executor.submit(fun, i) for i in list]
for res in as_completed(ans):
print(res.result())
Depending on the business scenario, if we need the output to be returned in traversal order, we use the map method, and if we want to return whoever completes first, we use the submit+as_complete method.
Thread Mutual Exclusion
We call a resource that only one thread is allowed to use in a time period a critical resource, and access to critical resources must be done in a mutually exclusive manner. Mutual exclusion, also known as indirect constraint relationship. A thread mutually exclusive means that when a thread accesses a critical resource, another thread that wants to access that critical resource must wait. After the thread currently accessing the critical resource finishes accessing it and releases it, the other thread can access it. The function of a lock is to implement thread mutual exclusion.
I compare thread mutual exclusion to the process of going to the toilet booth for a big number, because there is only one pit in the booth, so only one person is allowed to do the big number. When the first person wants to go to the toilet, they will put a lock on the door. At this time, if the second person also wants to take a big number, then they must wait until the first person is done and unlock the door before they can proceed, and in the meantime the second person will have to wait outside the door. This process works the same way as using locks in code, and the pitfall here is the critical resource.
Python’s threading module introduces locks. threading module provides the Lock class, which has the following methods to add and release locks.
acquire(): adds a lock to a Lock, where the timeout parameter specifies how many seconds to add the lock
release(): releases the lock.
class Account:
def __init__(self, card_id, balance):
# Two variables that encapsulate the account ID, account balance
self.card_id = card_id
self.balance = balance
def withdraw(account, money):
# Do the locking
lock.acquire()
# Account balance is greater than the number of money withdrawn
if account.balance >= money:
# Spit out the bill
print(threading.current_thread().name + “Money withdrawal successful! Spit out money:” + str(money),end=’ ‘)
# Modify the balance
account.balance -= money
print(“\t balance is: ” + str(account.balance))
else:
print(threading.current_thread().name + “Failed to withdraw money! Insufficient balance”)
# Unlock
lock.release()
# Create an account with a bank card id of 8888 and a deposit of $1000
acct = Account(“8888” , 1000)
# Simulate two withdrawals to the same account
# Create a lock in the main thread
lock = threading.Lock()
threading.Thread(name=’Window A’, target=withdraw , args=(acct , 800)).start()
threading.Thread(name=’Window B’, target=withdraw , args=(acct , 800)).start()
The difference between lock and Rlock
Difference 1: Lock is called primitive lock and can only be requested once by a thread; RLock is called reentrant lock and can be requested multiple times by a thread, i.e. locks can be nested within locks.
import threading
def main():
lock.acquire()
print(‘First lock’)
lock.acquire()
print(‘Second lock’)
lock.release()
lock.release()
if __name__ == ‘__main__’:
lock = threading.Lock()
main()
We will see that the program only prints “first lock” and the program neither terminates nor continues to run. This is because the second acquire request is made when the lock is not released after the first lock in the same thread, which makes it impossible to perform a release, so the lock can never be released, which is a deadlock. If we use RLock we can run normally and no deadlock state will occur.
Difference 2: When Lock is locked, it does not belong to a specific thread and can be unlocked and released in another thread; while RLock can only be released by the current thread, not by other threads, so when using RLock, acquire and release must appear in pairs, i.e., the bell must be untied.
import threading
def main():
lock.release()
print(“Print after subthread unlocking”)
if __name__ == ‘__main__’:
lock = threading.
lock.acquire()
Thread(target=main)
t.start()
Define Lock lock in the main thread, then put the lock on, then create a child thread t to run the main function to release the lock, the result is output normally, indicating that the lock on the main thread, can be unlocked by the child thread.
If the above lock is changed to RLock then an error is reported. In practice, when designing a program, we will encapsulate each function into a function separately, and there may be critical areas in each function, so we need to use RLock.
import threading
import time
def fun_1():
print(‘start’)
time.sleep(1)
lock.acquire()
print(“First lock”)
fun_2()
lock.release()
def fun_2():
lock.acquire()
print(“second lock”)
lock.release()
if __name__ == ‘__main__’:
lock = threading.RLock()
t1 = threading.Thread(target=fun_1)
t2 = threading.
t1.start()
t2.start()
In a nutshell, Lock can’t be snapped, RLock can be snapped; Lock can be operated by locks in other threads, RLock can only be operated by this thread.