目標(biāo):在中國的股票市場上盈利,每周都有單個股票盈利2%,月總盈利超過2%
計劃實現(xiàn)方式:Pycharm + Anaconda3 + Python3 + Django + AKShare + MongoDB
目前采用的實現(xiàn)方式:Pycharm + Anaconda3 + Python3 + Flask + AKShare
以后可能會用到 :MongoDB , SQLAlchemy ,baostock ,Tushare
機(jī)器學(xué)習(xí) 會在以后的實踐中逐步用到。
實現(xiàn)方式
上一篇文章寫了采集的方法。本篇文章包含完整代碼和調(diào)用代碼。
采用后臺執(zhí)行的方式。
gupiao.py 如下:
import akshare as ak
import threading
import datetime
import os
from threading import Thread
def get_start():
start_stock_daily()
# 這里就是核心了,調(diào)用這部分就會自動下載 深圳A股 的所有股票的歷史記錄
def start_stock_daily(indicator="A股列表", folder="sz_a", prefix="sz"):
file_path = "D:/work/data/" + folder + "/"
file_path_name = get_sz_a(file_path, indicator)
print(file_path_name)
num = 0
with open(file_path_name, "r", encoding='UTF-8') as stock_lines:
for stock_line in stock_lines.readlines():
num = num + 1
if num == 1:
continue
stock_line_arr = stock_line.split("|")
symbol = prefix + stock_line_arr[5]
print("股票信息=" + symbol + "||" + stock_line_arr[6])
stock_csv = get_stock_daily(file_path, symbol)
print("stock_csv=" + stock_csv)
# 獲得深圳主板A股列表,每天獲取一次不重復(fù)獲取
# file_path 需要全路徑,以 | 進(jìn)行間隔
# indicator 可選參數(shù) "A股列表", "B股列表", "AB股列表", "上市公司列表", "主板", "中小企業(yè)板", "創(chuàng)業(yè)板"
def get_sz_a(file_path, indicator="A股列表"):
today = datetime.datetime.today()
file_name = "sz_a_" + today.strftime('%Y%m%d') + ".csv"
if not os.path.exists(file_path): # 如果路徑不存在則創(chuàng)建
os.makedirs(file_path)
if os.path.exists(file_path + file_name):
print("今日已經(jīng)獲取無需再次獲取," + today.strftime('%Y%m%d'))
return file_path + file_name
stock_info_sz_df = ak.stock_info_sz_name_code(indicator=indicator)
stock_info_sz_df.to_csv(file_path + file_name, sep="|")
print('獲取深圳主板A股列表并存儲為CSV!' + today.strftime('%Y%m%d'))
return file_path + file_name
# 根據(jù)股票代碼獲取股票歷史數(shù)據(jù)
# symbol 股票代碼 需要前綴 sh 上海 sz 深圳,例如:sz300846
def get_stock_daily(file_path, symbol):
stock_zh_a_daily_hfq_df = ak.stock_zh_a_daily(symbol=symbol) # 返回不復(fù)權(quán)的數(shù)據(jù)
file_name = symbol + '.csv'
stock_zh_a_daily_hfq_df.to_csv(file_path + file_name)
return file_path + file_name
調(diào)用下載的部分,注意命名我隨便寫的,請根據(jù)情況自己修改,App.py 如下:
from flask import Flask
import akshare as ak
import gupiao
import datetime
import os
from concurrent.futures import ThreadPoolExecutor
import time
executor = ThreadPoolExecutor(2)
app = Flask(__name__)
@app.route('/test_thread')
def test_thread():
executor.submit(gupiao.get_start)
return "thread is running at background !!!"
if __name__ == '__main__':
app.run()
使用 Flask 框架,生成一個項目,然后創(chuàng)建一個gupiao.py 在 app.py 中調(diào)用,然后運(yùn)行項目。
在瀏覽器里面訪問 http://127.0.0.1:5000/test_thread
就能在后臺看到如圖的畫面,整個深圳A股的下載時間大約在2個小時到3個小時。
股票歷史數(shù)據(jù)
下載到本地如圖
股票歷史數(shù)據(jù)
爬取數(shù)據(jù)部分就完成了,之后就是篩選了。