3. 每天定时发送天气预报
0. 服务器上安装依赖包¶
# 因为是以服务器上的python环境为基础,所以需要在服务器中安装需要的包!
# 当然,也可以在本地环境测试好后,服务器中安装虚拟环境再安装相关的包测试!比较麻烦
pip install bs4
pip install schedule
pip install requests
# 可以等待第7步制作好requirements.txt后,直接
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
1.需求分析¶
由于我经常不怎么看天气预报,总是在第二天早晨要去上班时发现天气不好,没有提前做好准备
所以通过每天定时发送未来7天天气预报来多次提醒自己,早做准备有备无患!
2.需求函数¶
# 以下是爬取未来7天天气情况的爬虫代码!
import requests
from bs4 import BeautifulSoup
import json
#1. 获取未来7天天气情况,封装成函数
def weather_data():
# 封装headers
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36'}
# 把URL链接赋值到变量url上
url = 'http://www.weather.com.cn/weather/101120501.shtml'
# 发送requests请求,并把响应的内容赋值到变量res中
res = requests.get(url, headers=headers)
res.encoding = 'utf-8'
soup = BeautifulSoup(res.text, 'html.parser')
# 未来7天的网页信息在ul标签中
bsdata=soup.find('ul',{'class':"t clearfix"})
# 未来7天每一天都在一个li标签中,存到列表里
lst=bsdata.find_all('li')
msg=""
# 从列表中获取每天的信息
for line in lst:
date= line.find('h1')
# print(date.string)
weather=line.find('p',class_='wea')
# print(weather.string)
temp=line.find(class_='tem')
# print(temp.text.strip())
wind=line.find(class_='win')
# print(wind.text.strip())
# print("\n")
# print(
# f'日期:{date.string}\t'
# f'天气:{weather.string}\t'
# f'温度:{temp.text.strip()}\t'
# f'风力:{wind.text.strip()}\t'
# )
msg = msg + f"烟台:\n{date.string}温度为:{temp.text.strip()}\n天气为:{weather.string}\n风力为:{wind.text.strip()}\n\n"
return msg
msg = weather_data()
# 测试查看发送的内容样式!
# print(msg)
3. 企业微信发送函数¶
import requests
import json
#2. 发送企业微信
# 自己的企业微信应用《我的通知》相关信息,不可让别人知道!
Secret = "jTuv3jMXjEjU4YHq5Z6sFgy0QuIyEXVjDBAkallr8v4"
corpid = 'ww934ca669c395d3ec'
agentid = 1000007
touser = "ChuPeng" # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!
def wechat(Secret,corpid,agentid,touser,msg):
# 1. 我的企业微信信息!
Secret = Secret
corpid = corpid
# 2. 获取token
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}'
getr = requests.get(url=url.format(corpid, Secret))
access_token = getr.json().get('access_token')
# print(access_token)
# 3. 要发送的消息
data = {
"touser": touser, # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!
# "toparty" : "PartyID1|PartyID2", # 向这些部门发送
"msgtype": "text",
"agentid": int(agentid), # 应用的 id 号
"text": {
"content": msg
},
"safe": 0
}
# 4. 发送
try:
r = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(access_token),
data=json.dumps(data))
print("发送成功!", r.json())
except:
print("发送失败!")
# 调用函数
4. 设置定时任务¶
import schedule
import time
# 定时任务
def job():
wechat(Secret, corpid, agentid, touser, msg)
# 测试用定时,测试完成注释掉
# schedule.every(20).seconds.do(job)
# 设定为定时执行!
schedule.every().day.at("06:20").do(job)
schedule.every().day.at("07:20").do(job)
schedule.every().day.at("08:20").do(job)
schedule.every().day.at("10:20").do(job)
schedule.every().day.at("16:20").do(job)
while True:
schedule.run_pending()
#注意:必须加上后面的延时(延时多少自己设定),否则会占满CPU资源!!!
time.sleep(10)
5. 组合起来的完整代码¶
import schedule
import time
import requests
from bs4 import BeautifulSoup
import json
#1. 获取未来7天天气情况,封装成函数
def weather_data():
# 封装headers
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36'}
# 把URL链接赋值到变量url上
url = 'http://www.weather.com.cn/weather/101120501.shtml'
# 发送requests请求,并把响应的内容赋值到变量res中
res = requests.get(url, headers=headers)
res.encoding = 'utf-8'
soup = BeautifulSoup(res.text, 'html.parser')
# 未来7天的网页信息在ul标签中
bsdata=soup.find('ul',{'class':"t clearfix"})
# 未来7天每一天都在一个li标签中,存到列表里
lst=bsdata.find_all('li')
msg=""
# 从列表中获取每天的信息
for line in lst:
date= line.find('h1')
# print(date.string)
weather=line.find('p',class_='wea')
# print(weather.string)
temp=line.find(class_='tem')
# print(temp.text.strip())
wind=line.find(class_='win')
# print(wind.text.strip())
# print("\n")
# print(
# f'日期:{date.string}\t'
# f'天气:{weather.string}\t'
# f'温度:{temp.text.strip()}\t'
# f'风力:{wind.text.strip()}\t'
# )
msg = msg + f"烟台:\n{date.string}温度为:{temp.text.strip()}\n天气为:{weather.string}\n风力为:{wind.text.strip()}\n\n"
return msg
#2. 发送企业微信
# 自己的企业微信应用《我的通知》相关信息,不可让别人知道!
Secret = "jTuv3jMXjEjU4YHq5Z6sFgy0QuIyEXVjDBAkallr8v4"
corpid = 'ww934ca669c395d3ec'
agentid = 1000007
touser = "ChuPeng" # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!
def wechat(Secret,corpid,agentid,touser,msg):
# 1. 我的企业微信信息!
Secret = Secret
corpid = corpid
# 2. 获取token
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}'
getr = requests.get(url=url.format(corpid, Secret))
access_token = getr.json().get('access_token')
# print(access_token)
# 3. 要发送的消息
data = {
"touser": touser, # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!
# "toparty" : "PartyID1|PartyID2", # 向这些部门发送
"msgtype": "text",
"agentid": int(agentid), # 应用的 id 号
"text": {
"content": msg
},
"safe": 0
}
# 4. 发送
try:
r = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(access_token),
data=json.dumps(data))
print("发送成功!", r.json())
except:
print("发送失败!")
# 调用函数
# 定时任务
def job():
msg = weather_data()
wechat(Secret, corpid, agentid, touser, msg)
# 测试用定时,测试完成注释掉
# schedule.every(20).seconds.do(job)
# 设定为定时执行!
schedule.every().day.at("06:20").do(job)
schedule.every().day.at("07:20").do(job)
schedule.every().day.at("08:20").do(job)
schedule.every().day.at("10:20").do(job)
schedule.every().day.at("16:20").do(job)
while True:
schedule.run_pending()
#注意:必须加上后面的延时(延时多少自己设定),否则会占满CPU资源!!!
time.sleep(10)
6. 调试完成后,生成requirements文件¶
# windows上命令行终端生成依赖包文件
pip freeze > piplist.txt
7. 服务器上创建requirements.txt¶
为了防止不必要的麻烦,服务器上直接创建requirements.txt
cd /service/pycharm/schedule
vim requirements.txt
# 只保留必要的包即可!
beautifulsoup4==4.10.0
bs4==0.0.1
requests==2.27.1
schedule==1.1.0
补充:如果服务器上python项目多,为了防止依赖包过多造成麻烦,可以使用虚拟环境!¶
1. 服务器上创建虚拟环境¶
cd /service/pycharm/schedule
virtualenv env
source env/bin/activate
pip install -r requirements.txt
2. 服务器上测试代码¶
python3 1.未来7天天气预报.py

3. 将代码放入后台运行¶
nohup python3 1.未来7天天气预报.py &
# 以后,每天定时就会收到天气预报了!!!
最后更新:
2022-03-23 05:21:05