跳转至

3. 每天定时发送天气预报

0. 服务器上安装依赖包

# 因为是以服务器上的python环境为基础,所以需要在服务器中安装需要的包!
# 当然,也可以在本地环境测试好后,服务器中安装虚拟环境再安装相关的包测试!比较麻烦
pip install bs4
pip install schedule
pip install requests

# 可以等待第7步制作好requirements.txt后,直接
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

1.需求分析

由于我经常不怎么看天气预报,总是在第二天早晨要去上班时发现天气不好,没有提前做好准备
所以通过每天定时发送未来7天天气预报来多次提醒自己,早做准备有备无患!

2.需求函数

# 以下是爬取未来7天天气情况的爬虫代码!

import requests
from bs4 import BeautifulSoup
import json

#1. 获取未来7天天气情况,封装成函数
def weather_data():
    # 封装headers
    headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36'}
    # 把URL链接赋值到变量url上
    url = 'http://www.weather.com.cn/weather/101120501.shtml'
    # 发送requests请求,并把响应的内容赋值到变量res中
    res = requests.get(url, headers=headers)
    res.encoding = 'utf-8'

    soup = BeautifulSoup(res.text, 'html.parser')

    # 未来7天的网页信息在ul标签中
    bsdata=soup.find('ul',{'class':"t clearfix"})
    # 未来7天每一天都在一个li标签中,存到列表里
    lst=bsdata.find_all('li')
    msg=""
    # 从列表中获取每天的信息
    for line in lst:
        date= line.find('h1')
        # print(date.string)
        weather=line.find('p',class_='wea')
        # print(weather.string)
        temp=line.find(class_='tem')
        # print(temp.text.strip())
        wind=line.find(class_='win')
        # print(wind.text.strip())
        # print("\n")

        # print(
        #     f'日期:{date.string}\t'
        #     f'天气:{weather.string}\t'
        #     f'温度:{temp.text.strip()}\t'
        #     f'风力:{wind.text.strip()}\t'
        # )
        msg = msg + f"烟台:\n{date.string}温度为:{temp.text.strip()}\n天气为:{weather.string}\n风力为:{wind.text.strip()}\n\n"
    return msg

msg = weather_data()
# 测试查看发送的内容样式!
# print(msg)

3. 企业微信发送函数

import requests
import json

#2. 发送企业微信
# 自己的企业微信应用《我的通知》相关信息,不可让别人知道!
Secret = "jTuv3jMXjEjU4YHq5Z6sFgy0QuIyEXVjDBAkallr8v4"
corpid = 'ww934ca669c395d3ec'
agentid = 1000007
touser = "ChuPeng"  # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!

def wechat(Secret,corpid,agentid,touser,msg):
    # 1. 我的企业微信信息!
    Secret = Secret
    corpid = corpid

    # 2. 获取token
    url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}'
    getr = requests.get(url=url.format(corpid, Secret))
    access_token = getr.json().get('access_token')
    # print(access_token)

    # 3. 要发送的消息
    data = {
        "touser": touser,  # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!
        # "toparty" : "PartyID1|PartyID2",   # 向这些部门发送
        "msgtype": "text",
        "agentid": int(agentid),  # 应用的 id 号
        "text": {
            "content": msg
        },
        "safe": 0
    }

    # 4. 发送
    try:
        r = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(access_token),
                          data=json.dumps(data))
        print("发送成功!", r.json())
    except:
        print("发送失败!")


#  调用函数

4. 设置定时任务

import schedule
import time

# 定时任务
def job():
    wechat(Secret, corpid, agentid, touser, msg)

# 测试用定时,测试完成注释掉
# schedule.every(20).seconds.do(job)
# 设定为定时执行!
schedule.every().day.at("06:20").do(job)
schedule.every().day.at("07:20").do(job)
schedule.every().day.at("08:20").do(job)
schedule.every().day.at("10:20").do(job)
schedule.every().day.at("16:20").do(job)

while True:
    schedule.run_pending()
    #注意:必须加上后面的延时(延时多少自己设定),否则会占满CPU资源!!!
    time.sleep(10)

5. 组合起来的完整代码

import schedule
import time
import requests
from bs4 import BeautifulSoup
import json

#1. 获取未来7天天气情况,封装成函数
def weather_data():
    # 封装headers
    headers = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36'}
    # 把URL链接赋值到变量url上
    url = 'http://www.weather.com.cn/weather/101120501.shtml'
    # 发送requests请求,并把响应的内容赋值到变量res中
    res = requests.get(url, headers=headers)
    res.encoding = 'utf-8'

    soup = BeautifulSoup(res.text, 'html.parser')

    # 未来7天的网页信息在ul标签中
    bsdata=soup.find('ul',{'class':"t clearfix"})
    # 未来7天每一天都在一个li标签中,存到列表里
    lst=bsdata.find_all('li')
    msg=""
    # 从列表中获取每天的信息
    for line in lst:
        date= line.find('h1')
        # print(date.string)
        weather=line.find('p',class_='wea')
        # print(weather.string)
        temp=line.find(class_='tem')
        # print(temp.text.strip())
        wind=line.find(class_='win')
        # print(wind.text.strip())
        # print("\n")

        # print(
        #     f'日期:{date.string}\t'
        #     f'天气:{weather.string}\t'
        #     f'温度:{temp.text.strip()}\t'
        #     f'风力:{wind.text.strip()}\t'
        # )
        msg = msg + f"烟台:\n{date.string}温度为:{temp.text.strip()}\n天气为:{weather.string}\n风力为:{wind.text.strip()}\n\n"
    return msg



#2. 发送企业微信
# 自己的企业微信应用《我的通知》相关信息,不可让别人知道!
Secret = "jTuv3jMXjEjU4YHq5Z6sFgy0QuIyEXVjDBAkallr8v4"
corpid = 'ww934ca669c395d3ec'
agentid = 1000007
touser = "ChuPeng"  # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!

def wechat(Secret,corpid,agentid,touser,msg):
    # 1. 我的企业微信信息!
    Secret = Secret
    corpid = corpid

    # 2. 获取token
    url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}'
    getr = requests.get(url=url.format(corpid, Secret))
    access_token = getr.json().get('access_token')
    # print(access_token)

    # 3. 要发送的消息
    data = {
        "touser": touser,  # 向这些用户账户发送。注意:这里的账户需要登录企业微信后台查看!!!
        # "toparty" : "PartyID1|PartyID2",   # 向这些部门发送
        "msgtype": "text",
        "agentid": int(agentid),  # 应用的 id 号
        "text": {
            "content": msg
        },
        "safe": 0
    }

    # 4. 发送
    try:
        r = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(access_token),
                          data=json.dumps(data))
        print("发送成功!", r.json())
    except:
        print("发送失败!")


#  调用函数



# 定时任务
def job():
    msg = weather_data()
    wechat(Secret, corpid, agentid, touser, msg)

# 测试用定时,测试完成注释掉
# schedule.every(20).seconds.do(job)
# 设定为定时执行!
schedule.every().day.at("06:20").do(job)
schedule.every().day.at("07:20").do(job)
schedule.every().day.at("08:20").do(job)
schedule.every().day.at("10:20").do(job)
schedule.every().day.at("16:20").do(job)

while True:
    schedule.run_pending()
    #注意:必须加上后面的延时(延时多少自己设定),否则会占满CPU资源!!!
    time.sleep(10)

6. 调试完成后,生成requirements文件

# windows上命令行终端生成依赖包文件
pip freeze > piplist.txt

7. 服务器上创建requirements.txt

为了防止不必要的麻烦,服务器上直接创建requirements.txt
cd /service/pycharm/schedule
vim requirements.txt
# 只保留必要的包即可!
beautifulsoup4==4.10.0
bs4==0.0.1
requests==2.27.1
schedule==1.1.0

补充:如果服务器上python项目多,为了防止依赖包过多造成麻烦,可以使用虚拟环境!

1. 服务器上创建虚拟环境

cd /service/pycharm/schedule
virtualenv env
source env/bin/activate
pip install -r requirements.txt

2. 服务器上测试代码

 python3 1.未来7天天气预报.py
img_7.png

3. 将代码放入后台运行

nohup python3 1.未来7天天气预报.py &

# 以后,每天定时就会收到天气预报了!!!

最后更新: 2022-03-23 05:21:05