夜間移動偵測
設備
Raspberry Pi Model B+
- 700MHz CPU, BCM2835 with 512MB RAM
- 40pin extended GPIO
- Full size HDMI
- 4 USB ports
- Micro SD slot
Power
The Pi itself wants about 800mA@5V.Pi 3 is powered by a +5.1V
Exactly how much current (mA) the Raspberry Pi requires is dependent on what you connect to it.
Typically, the model B uses between 700-1000mA depending on what peripherals are connected.
The maximum power the Raspberry Pi can use is 1 Amp.
- The GPIO pins can draw 50mA safely, distributed across all the pins; an individual GPIO pin can only safely draw 16mA.
- The HDMI port uses 50mA
- the camera module requires 250mA
Raspberry Pi NoIR Camera V2 紅外線攝像模組
- Resolution:800 萬像素
- Still picture resolution: 3280 × 2464
- Max image transfer rate: 1080p30: 720p60; 640x480p90
3w大功率樹莓派夜視攝像頭專用紅外補光燈
3W / 3.3V = 0.9 A
RaspberryPi Home Surveillance with only ~150 lines of Python Code
We need to install the related Python libraries. They are all eligible by “pip install”
- PiCamera For taking frames from Raspi camera
- cv2 All smart computer vision algorithms
- smtplib Sending mails
- Flask Simple website for on/off security
The source code can be downloaded from Github .
Installation
sudo apt-get update
sudo apt-get install build-essential python2.7-dev python-setuptools
sudo easy_install pip
sudo pip install picamera
Source
pi_surveillance.py
# python pi_surveillance.py --conf conf.json
# import the necessary packages
# from dropbox.client import DropboxOAuth2FlowNoRedirect
# from dropbox.client import DropboxClient
from picamera.array import PiRGBArray
from picamera import PiCamera
from utils import send_email, TempImage
import argparse
import warnings
import datetime
import json
import time
import cv2
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-c", "--conf", required=True,
help="path to the JSON configuration file")
ap.add_argument("-d", "--debug", required=False,
help="debug mode on/off")
args = vars(ap.parse_args())
# filter warnings, load the configuration and initialize the Dropbox
# client
warnings.filterwarnings("ignore")
conf = json.load(open(args["conf"]))
client = None
# set debug mode on
if args['debug']:
print(' > Debug mode is on !')
debug_mode = True
else:
debug_mode = False
# check to see if the Dropbox should be used
if conf["use_dropbox"]:
# connect to dropbox and start the session authorization process
flow = DropboxOAuth2FlowNoRedirect(conf["dropbox_key"], conf["dropbox_secret"])
print "[INFO] Authorize this application: {}".format(flow.start())
authCode = raw_input("Enter auth code here: ").strip()
# finish the authorization and grab the Dropbox client
(accessToken, userID) = flow.finish(authCode)
client = DropboxClient(accessToken)
print "[SUCCESS] dropbox account linked"
# initialize the camera and grab a reference to the raw camera capture
camera = PiCamera()
camera.resolution = tuple(conf["resolution"]) # (640, 480)
camera.framerate = conf["fps"]
rawCapture = PiRGBArray(camera, size=tuple(conf["resolution"]))
# allow the camera to warmup, then initialize the average frame, last
# uploaded timestamp, and frame motion counter
print "[INFO] warming up..."
time.sleep(conf["camera_warmup_time"])
avg = None
lastUploaded = datetime.datetime.now()
motionCounter = 0
print('[INFO] talking raspi started !!')
# capture frames from the camera
for f in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
# grab the raw NumPy array representing the image and initialize
# the timestamp and occupied/unoccupied text
frame = f.array
timestamp = datetime.datetime.now()
text = "Unoccupied"
######################################################################
# COMPUTER VISION
######################################################################
# resize the frame, convert it to grayscale, and blur it
# TODO: resize image here into cmaller sizes
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, tuple(conf['blur_size']), 0)
# if the average frame is None, initialize it
if avg is None:
print "[INFO] starting background model..."
avg = gray.copy().astype("float")
rawCapture.truncate(0)
continue
# accumulate the weighted average between the current frame and
# previous frames, then compute the difference between the current
# frame and running average
frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg))
cv2.accumulateWeighted(gray, avg, 0.5)
# threshold the delta image, dilate the thresholded image to fill
# in holes, then find contours on thresholded image
thresh = cv2.threshold(frameDelta, conf["delta_thresh"], 255,
cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
im2 ,cnts, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
# loop over the contours
for c in cnts:
# if the contour is too small, ignore it
if cv2.contourArea(c) < conf["min_area"]:
continue
# compute the bounding box for the contour, draw it on the frame,
# and update the text
(x, y, w, h) = cv2.boundingRect(c)
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
text = "Occupied"
# draw the text and timestamp on the frame
ts = timestamp.strftime("%A %d %B %Y %I:%M:%S%p")
cv2.putText(frame, "Room Status: {}".format(text), (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
cv2.putText(frame, ts, (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.35, (0, 0, 255), 1)
###################################################################################
# LOGIC
###################################################################################
# check to see if the room is occupied
if text == "Occupied":
# save occupied frame
cv2.imwrite("/tmp/talkingraspi_{}.jpg".format(motionCounter), frame);
# check to see if enough time has passed between uploads
if (timestamp - lastUploaded).seconds >= conf["min_upload_seconds"]:
# increment the motion counter
motionCounter += 1;
# check to see if the number of frames with consistent motion is
# high enough
if motionCounter >= int(conf["min_motion_frames"]):
# check to see if dropbox sohuld be used
if conf["use_dropbox"]:
# write the image to temporary file
t = TempImage()
cv2.imwrite(t.path, frame)
# upload the image to Dropbox and cleanup the tempory image
print "[UPLOAD] {}".format(ts)
path = "{base_path}/{timestamp}.jpg".format(
base_path=conf["dropbox_base_path"], timestamp=ts)
client.put_file(path, open(t.path, "rb"))
t.cleanup()
# send an email if enabled
if conf["use_email"]:
print("[INFO] Sending an alert email!!!")
send_email(conf)
print("[INFO] waiting {} seconds...".format(conf["camera_warmup_time"]))
time.sleep(conf["camera_warmup_time"])
print("[INFO] running")
# update the last uploaded timestamp and reset the motion
# counter
lastUploaded = timestamp
motionCounter = 0
# otherwise, the room is not occupied
else:
motionCounter = 0
# check to see if the frames should be displayed to screen
if conf["show_video"]:
# display the security feed
cv2.imshow("Security Feed", frame)
key = cv2.waitKey(1) & 0xFF
if debug_mode:
cv2.imshow('Debug blurred gray frame', gray)
cv2.imshow('Debug threshold frame', thresh)
# if the `q` key is pressed, break from the lop
if key == ord("q"):
break
# clear the stream in preparation for the next frame
rawCapture.truncate(0)
Comments:- argparse The argparse module makes it easy to write user-friendly command-line interfaces. Calls to the add_argument() method tell the ArgumentParser how to take the strings on the command line and turn them into objects.
ArgumentParser.add_argument(name or flags...[, action][, nargs][, const][, default][, type][, choices][, required][, help][, metavar][, dest])
Define how a single command-line argument should be parsed. Each parameter has its own more detailed description below, but in short they are:
name or flags - Either a name or a list of option strings, e.g. foo or -f, --foo.
action - The basic type of action to be taken when this argument is encountered at the command line.
nargs - The number of command-line arguments that should be consumed.
const - A constant value required by some action and nargs selections.
default - The value produced if the argument is absent from the command line.
type - The type to which the command-line argument should be converted.
choices - A container of the allowable values for the argument.
required - Whether or not the command-line option may be omitted (optionals only).
help - A brief description of what the argument does.
metavar - A name for the argument in usage messages.
dest - The name of the attribute to be added to the object returned by parse_args().
server.py
import socket
import subprocess
from flask import Flask, render_template
app = Flask(__name__)
# keep runnign process global
proc = None
def get_ip_address():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
@app.route("/")
def hello():
return render_template("index.html")
@app.route("/start", methods=['GET', 'POST'])
def start_talkingraspi():
global proc
print(" > Start talkingraspi!")
proc = subprocess.Popen(["python", "pi_surveillance.py", "-c", "conf.json"])
print(" > Process id {}".format(proc.pid))
return "Started!"
@app.route("/stop", methods=['GET', 'POST'])
def stop_talkingraspi():
global proc
print(" > Stop talkingraspi!")
# subprocess.call(["kill", "-9", "%d" % proc.pid])
proc.kill()
print(" > Process {} killed!".format(proc.pid))
return "Stopped!"
@app.route("/status", methods=['GET', 'POST'])
def status_talkingraspi():
global proc
if proc is None:
print(" > Talkingraspi is resting")
return "Resting!"
if proc.poll() is None:
print(" > Talking raspi is running (Process {})!".format(proc.pid))
return "Running!"
else:
print(" > Talkingraspi is resting")
return "Stopped!"
if __name__ == "__main__":
print "Connect to http://{}:5555 to controll TalkingRaspi !!".format(get_ip_address())
app.run(host="0.0.0.0", port=5555, debug=False)
utils.py
import json
import smtplib
import uuid
import os
import glob
from os.path import basename
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
class TempImage:
def __init__(self, basePath="./", ext=".jpg"):
# construct the file path
self.path = "{base_path}/{rand}{ext}".format(base_path=basePath,
rand=str(uuid.uuid4()), ext=ext)
def cleanup(self):
# remove the file
os.remove(self.path)
def send_email(conf):
fromaddr = "address@gmail.com"
for email_address in conf['email_address']:
toaddrs = email_address
print("[INFO] Emailing to {}".format(email_address))
text = 'Hey Someone in Your House!!!!'
subject = 'Security Alert!!'
message = 'Subject: {}\n\n{}'.format(subject, text)
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddrs
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = subject
msg.attach(MIMEText(text))
# set attachments
files = glob.glob("/tmp/talkingraspi*")
print("[INFO] Number of images attached to email: {}".format(len(files)))
for f in files:
with open(f, "rb") as fil:
part = MIMEApplication(
fil.read(),
Name=basename(f)
)
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
msg.attach(part)
# Credentials (if needed) : EDIT THIS
username = "gmail_username"
password = "password"
# The actual mail send
server = smtplib.SMTP('smtp.gmail.com:587')
server.starttls()
server.login(username,password)
server.sendmail(fromaddr, toaddrs, msg.as_string())
server.quit()
def send_mail(conf, files=None,
):
assert isinstance(send_to, list)
msg = MIMEMultipart()
msg['From'] = send_from
msg['To'] = COMMASPACE.join(send_to)
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = subject
msg.attach(MIMEText(text))
smtp = smtplib.SMTP(server)
smtp.sendmail(send_from, send_to, msg.as_string())
smtp.close()
留言