使用 Notion API 实现工作流自动化

使用 Notion API 实现工作流自动化

Notion API 简介

Notion API 的推出让 Notion 从一个封闭的笔记应用变成了可编程的平台。通过 API,我们可以:

  • 读取和创建页面内容
  • 管理数据库记录
  • 同步外部数据
  • 构建自定义集成

环境准备

1. 获取 API Token

  1. 访问 Notion Developers
  2. 创建新的 Integration
  3. 获取 Internal Integration Token
  4. 在 Notion 中分享页面给 Integration

2. 安装依赖

# Node.js
npm install @notionhq/client

# Python
pip install notion-client

实战案例

案例 1:自动化日报系统

每天自动创建日报页面,并填充基础信息:

const { Client } = require('@notionhq/client');
const axios = require('axios');

const notion = new Client({ auth: process.env.NOTION_KEY });

async function createDailyReport() {
  const today = new Date();
  const dateStr = today.toLocaleDateString('zh-CN');
  
  // 获取天气信息
  const weather = await getWeather();
  
  // 获取今日待办
  const todos = await getTodayTodos();
  
  // 创建日报页面
  const response = await notion.pages.create({
    parent: { database_id: process.env.DAILY_DB_ID },
    icon: { emoji: "📅" },
    properties: {
      "标题": {
        title: [{ text: { content: `${dateStr} 日报` } }]
      },
      "日期": {
        date: { start: today.toISOString() }
      },
      "天气": {
        rich_text: [{ text: { content: weather } }]
      },
      "状态": {
        select: { name: "进行中" }
      }
    },
    children: [
      {
        object: "block",
        type: "heading_2",
        heading_2: {
          rich_text: [{ text: { content: "今日待办" } }]
        }
      },
      {
        object: "block",
        type: "to_do",
        to_do: {
          rich_text: todos.map(todo => ({
            text: { content: todo }
          })),
          checked: false
        }
      },
      {
        object: "block",
        type: "heading_2",
        heading_2: {
          rich_text: [{ text: { content: "工作记录" } }]
        }
      },
      {
        object: "block",
        type: "paragraph",
        paragraph: {
          rich_text: [{ text: { content: "" } }]
        }
      }
    ]
  });
  
  console.log(`日报创建成功: ${response.url}`);
}

// 定时执行
const cron = require('node-cron');
cron.schedule('0 9 * * *', createDailyReport);

案例 2:内容发布管理

自动将 Notion 中的文章发布到博客:

from notion_client import Client
import markdown
import requests

notion = Client(auth=os.environ["NOTION_KEY"])

def publish_articles():
    # 查询待发布文章
    results = notion.databases.query(
        database_id=os.environ["CONTENT_DB_ID"],
        filter={
            "property": "状态",
            "select": {
                "equals": "待发布"
            }
        }
    )
    
    for page in results["results"]:
        # 获取页面内容
        blocks = notion.blocks.children.list(page["id"])
        
        # 转换为 Markdown
        content = blocks_to_markdown(blocks)
        
        # 发布到博客平台
        response = publish_to_blog({
            "title": page["properties"]["标题"]["title"][0]["text"]["content"],
            "content": content,
            "tags": [tag["name"] for tag in page["properties"]["标签"]["multi_select"]]
        })
        
        if response.success:
            # 更新状态
            notion.pages.update(
                page_id=page["id"],
                properties={
                    "状态": {"select": {"name": "已发布"}},
                    "发布链接": {"url": response.url}
                }
            )

def blocks_to_markdown(blocks):
    markdown_content = ""
    
    for block in blocks["results"]:
        block_type = block["type"]
        
        if block_type == "paragraph":
            text = get_text_from_block(block)
            markdown_content += f"{text}\n\n"
        
        elif block_type == "heading_1":
            text = get_text_from_block(block)
            markdown_content += f"# {text}\n\n"
        
        elif block_type == "heading_2":
            text = get_text_from_block(block)
            markdown_content += f"## {text}\n\n"
        
        elif block_type == "bulleted_list_item":
            text = get_text_from_block(block)
            markdown_content += f"- {text}\n"
        
        elif block_type == "code":
            code = block["code"]["rich_text"][0]["text"]["content"]
            language = block["code"]["language"]
            markdown_content += f"```{language}\n{code}\n```\n\n"
    
    return markdown_content

案例 3:数据分析仪表板

定期分析 Notion 数据并生成报告:

async function generateWeeklyReport() {
  const oneWeekAgo = new Date();
  oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
  
  // 获取本周任务数据
  const tasks = await notion.databases.query({
    database_id: process.env.TASKS_DB_ID,
    filter: {
      and: [
        {
          property: "创建时间",
          date: {
            after: oneWeekAgo.toISOString()
          }
        }
      ]
    }
  });
  
  // 统计数据
  const stats = {
    total: tasks.results.length,
    completed: tasks.results.filter(t => 
      t.properties["状态"].select?.name === "已完成"
    ).length,
    inProgress: tasks.results.filter(t => 
      t.properties["状态"].select?.name === "进行中"
    ).length,
    byCategory: {}
  };
  
  // 按分类统计
  tasks.results.forEach(task => {
    const category = task.properties["分类"].select?.name || "未分类";
    stats.byCategory[category] = (stats.byCategory[category] || 0) + 1;
  });
  
  // 创建报告页面
  await notion.pages.create({
    parent: { database_id: process.env.REPORTS_DB_ID },
    properties: {
      "标题": {
        title: [{ 
          text: { 
            content: `周报 ${new Date().toLocaleDateString()}` 
          } 
        }]
      }
    },
    children: [
      {
        type: "heading_1",
        heading_1: {
          rich_text: [{ text: { content: "📊 本周数据统计" } }]
        }
      },
      {
        type: "paragraph",
        paragraph: {
          rich_text: [{ 
            text: { 
              content: `任务总数:${stats.total}\n已完成:${stats.completed}\n进行中:${stats.inProgress}\n完成率:${(stats.completed / stats.total * 100).toFixed(1)}%` 
            } 
          }]
        }
      },
      {
        type: "heading_2",
        heading_2: {
          rich_text: [{ text: { content: "📈 分类统计" } }]
        }
      },
      ...Object.entries(stats.byCategory).map(([category, count]) => ({
        type: "bulleted_list_item",
        bulleted_list_item: {
          rich_text: [{ 
            text: { content: `${category}: ${count} 个任务` } 
          }]
        }
      }))
    ]
  });
}

案例 4:双向同步系统

实现 Notion 与 Google Calendar 的双向同步:

from notion_client import Client
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
import datetime

class NotionGoogleSync:
    def __init__(self, notion_token, google_creds):
        self.notion = Client(auth=notion_token)
        self.calendar = build('calendar', 'v3', credentials=google_creds)
        
    def sync_to_google(self):
        """将 Notion 事件同步到 Google Calendar"""
        # 获取 Notion 中的事件
        events = self.notion.databases.query(
            database_id=os.environ["EVENTS_DB_ID"],
            filter={
                "property": "同步状态",
                "checkbox": {
                    "equals": False
                }
            }
        )
        
        for event in events["results"]:
            props = event["properties"]
            
            # 创建 Google Calendar 事件
            google_event = {
                'summary': props["标题"]["title"][0]["text"]["content"],
                'start': {
                    'dateTime': props["开始时间"]["date"]["start"],
                    'timeZone': 'Asia/Shanghai',
                },
                'end': {
                    'dateTime': props["结束时间"]["date"]["start"],
                    'timeZone': 'Asia/Shanghai',
                },
                'description': self.get_page_content(event["id"])
            }
            
            result = self.calendar.events().insert(
                calendarId='primary',
                body=google_event
            ).execute()
            
            # 更新 Notion 同步状态
            self.notion.pages.update(
                page_id=event["id"],
                properties={
                    "同步状态": {"checkbox": True},
                    "Google ID": {"rich_text": [{"text": {"content": result['id']}}]}
                }
            )
    
    def sync_from_google(self):
        """将 Google Calendar 事件同步到 Notion"""
        # 获取未来一周的事件
        now = datetime.datetime.utcnow().isoformat() + 'Z'
        events_result = self.calendar.events().list(
            calendarId='primary',
            timeMin=now,
            maxResults=100,
            singleEvents=True,
            orderBy='startTime'
        ).execute()
        
        events = events_result.get('items', [])
        
        for event in events:
            # 检查是否已存在
            existing = self.check_existing_event(event['id'])
            
            if not existing:
                # 创建 Notion 页面
                self.notion.pages.create({
                    "parent": {"database_id": os.environ["EVENTS_DB_ID"]},
                    "properties": {
                        "标题": {"title": [{"text": {"content": event['summary']}}]},
                        "开始时间": {"date": {"start": event['start'].get('dateTime', event['start'].get('date'))}},
                        "结束时间": {"date": {"start": event['end'].get('dateTime', event['end'].get('date'))}},
                        "Google ID": {"rich_text": [{"text": {"content": event['id']}}]},
                        "同步状态": {"checkbox": True}
                    }
                })

最佳实践

1. 错误处理

async function safeApiCall(apiFunction) {
  const maxRetries = 3;
  let lastError;
  
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await apiFunction();
    } catch (error) {
      lastError = error;
      
      if (error.code === 'rate_limited') {
        // 等待后重试
        await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
      } else if (error.code === 'unauthorized') {
        // 权限错误,直接抛出
        throw error;
      }
    }
  }
  
  throw lastError;
}

2. 批量操作优化

def batch_update_pages(page_updates, batch_size=10):
    """批量更新页面,避免触发速率限制"""
    
    for i in range(0, len(page_updates), batch_size):
        batch = page_updates[i:i + batch_size]
        
        for update in batch:
            notion.pages.update(
                page_id=update["id"],
                properties=update["properties"]
            )
        
        # 批次之间暂停,避免速率限制
        if i + batch_size < len(page_updates):
            time.sleep(1)

3. 数据缓存

const NodeCache = require('node-cache');
const cache = new NodeCache({ stdTTL: 600 }); // 10分钟缓存

async function getCachedDatabase(databaseId) {
  const cacheKey = `db_${databaseId}`;
  let data = cache.get(cacheKey);
  
  if (!data) {
    data = await notion.databases.retrieve({
      database_id: databaseId
    });
    cache.set(cacheKey, data);
  }
  
  return data;
}

部署方案

1. Vercel Functions

// api/notion-sync.js
export default async function handler(req, res) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method not allowed' });
  }
  
  try {
    await syncNotionData();
    res.status(200).json({ success: true });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
}

2. GitHub Actions

name: Notion Sync

on:
  schedule:
    - cron: '0 */6 * * *' # 每6小时执行一次
  workflow_dispatch:

jobs:
  sync:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-node@v2
      - run: npm install
      - run: node sync.js
        env:
          NOTION_KEY: ${{ secrets.NOTION_KEY }}

总结

Notion API 为我们打开了无限可能。通过合理的自动化设计,可以:

  1. 节省时间:自动化重复性工作
  2. 减少错误:程序化操作更准确
  3. 扩展功能:实现 Notion 原生不支持的功能
  4. 数据打通:与其他系统无缝集成

开始探索 Notion API,让你的工作流程更加智能高效!