Building a Threat Hunting Pipeline with Python and Jupyter

By Charles Givre · April 27, 2026

threat huntingPythonJupyterdata scienceSOCmachine learning

Most threat hunting guides describe the process abstractly: form a hypothesis, search for evidence, iterate. That framing is accurate but stops short of the part that actually takes time: getting data into a shape you can interrogate, writing code that tests a specific hypothesis, and building something repeatable instead of a one-off notebook you can’t read six weeks later.

This is what a working threat hunting pipeline looks like in Python and Jupyter.

Setting Up the Data Layer

Jupyter notebooks work well for hunt investigations because they combine code, output, and narrative in a single file. The risk is notebooks becoming unreadable ad-hoc sessions. Use consistent data loading patterns from the start.

Zeek logs include a #fields header. Parse it instead of hardcoding column names:

import pandas as pd
import numpy as np

def load_zeek_log(path):
    with open(path) as f:
        for line in f:
            if line.startswith('#fields'):
                cols = line.strip().split('\t')[1:]
                break
    return pd.read_csv(path, sep='\t', comment='#', names=cols, na_values=['-', '(empty)'])

df_conn = load_zeek_log('conn.log')
df_conn['ts'] = pd.to_datetime(df_conn['ts'], unit='s')

for col in ['orig_bytes', 'resp_bytes', 'duration']:
    df_conn[col] = pd.to_numeric(df_conn[col], errors='coerce')

For Windows Event Log (.evtx), use python-evtx:

import json
from evtx import PyEvtxParser

def load_evtx(path):
    parser = PyEvtxParser(path)
    return pd.json_normalize(
        [json.loads(r['data']) for r in parser.records_json()]
    )

df_security = load_evtx('Security.evtx')

For environments pulling from Sentinel, Splunk, or QRadar, MSTICpy (Microsoft Threat Intelligence Python Security Tools) provides a query interface that works across sources with consistent output DataFrames. The setup cost is real, but it pays off when a hunt hypothesis spans endpoint and network data from different platforms.

Hypothesis: Beaconing Detection

C2 beaconing (MITRE ATT&CK T1071.001) produces regular-interval outbound connections. The statistical signature is low variance in inter-arrival time (IAT) across many connections to the same destination IP.

The coefficient of variation (standard deviation divided by mean) captures this: a CV below 0.25 indicates connection intervals that are more regular than noise. A beacon firing every 60 seconds with minor jitter will cluster tightly. Legitimate traffic to the same host rarely does.

def compute_beacon_score(group):
    if len(group) < 15:
        return None
    group = group.sort_values('ts')
    iats = group['ts'].diff().dt.total_seconds().dropna()
    iat_mean = iats.mean()
    if iat_mean == 0:
        return None
    return pd.Series({
        'count': len(group),
        'iat_mean_s': round(iat_mean, 1),
        'iat_cv': round(iats.std() / iat_mean, 3),
        'total_bytes': group['orig_bytes'].sum()
    })

beacon_candidates = (
    df_conn[df_conn['proto'] == 'tcp']
    .groupby('id.resp_h', group_keys=False)
    .apply(compute_beacon_score)
    .dropna()
    .query('count >= 15 and iat_cv < 0.25')
    .sort_values('iat_cv')
)

The total_bytes column narrows the list. Real C2 beacons tend to be small: keepalives averaging a few hundred bytes. A host showing a CV of 0.10 across 50 connections but totaling 20GB is probably a backup job, not a beacon. A host showing a CV of 0.08 across 200 connections totaling 400KB is worth a follow-up.

One known false positive: NTP, telemetry agents, and heartbeat services produce low-CV behavior by design. Filter known-good destinations by ASN or hostname before presenting results to analysts.

Hypothesis: Lateral Movement via SMB

Lateral movement over SMB (MITRE ATT&CK T1021.002) produces Windows Security Event ID 4624 (successful logon) with LogonType 3 (network logon) from an account hitting multiple distinct destinations. Administrators doing their job will appear here. Regular user accounts and service accounts should not.

# Event ID 4624 = successful logon; LogonType 3 = network
df_4624 = df_security[
    (df_security['Event.System.EventID'] == 4624) &
    (df_security['Event.EventData.LogonType'] == '3')
].copy()

# Aggregate per account over the full observation window
lateral_candidates = (
    df_4624
    .groupby('Event.EventData.SubjectUserName')
    .agg(
        distinct_hosts=('Event.EventData.WorkstationName', 'nunique'),
        source_ips=('Event.EventData.IpAddress', 'nunique'),
        logon_count=('Event.System.EventRecordID', 'count')
    )
    .query('distinct_hosts > 5 and logon_count > 20')
    .sort_values('distinct_hosts', ascending=False)
)

Adjust the distinct_hosts threshold based on your environment’s baseline. In a flat network with permissive SMB policies, the threshold may need to be higher. In an environment with strict segmentation, two or three unexpected hosts may be enough to investigate.

Structuring for Reuse

A hunt that runs once and disappears is a missed opportunity. A few patterns that help:

Keep data loading functions in a shared utility module and import them at the top of each notebook. This keeps notebooks focused on hypothesis testing, not boilerplate.

Use a timestamp in the notebook filename: hunt_beaconing_2026-04-27.ipynb. In three months, you want to know when the hunt ran and against which data window.

When a hunt produces findings, export the notebook as an HTML report for sharing:

jupyter nbconvert --to html hunt_beaconing_2026-04-27.ipynb --output-dir=./reports/

For recurring hunts that run against fresh data on a schedule, papermill executes notebooks programmatically with injected parameters. Define the data window as a parameter, and you can run the same hunt notebook daily without opening a browser.

What Jupyter Doesn’t Replace

Notebooks are for exploration and documentation. When a hunt hypothesis proves reliable, translate the logic into a production detection. Sigma is the right destination for detection logic that needs to run continuously, that others need to maintain, or that needs to deploy across different SIEM platforms. The notebook is where you prove the hypothesis works; Sigma or your SIEM’s detection language is where it runs in production.

GTK Cyber’s applied data science training covers building, calibrating, and operationalizing threat hunting pipelines with hands-on labs against realistic network and endpoint datasets, including exercises in the exact feature engineering and hypothesis-testing patterns described here.

Frequently Asked Questions

How do you detect C2 beaconing in Zeek connection logs with Python?
Group connections by destination IP and calculate the coefficient of variation (CV) of inter-arrival times: standard deviation divided by mean. A CV below 0.25 across at least 15 connections indicates regular-interval beaconing consistent with C2 keepalives. Filter by total bytes before surfacing results: real C2 beacons are typically small (a few hundred bytes per connection). A low CV with large total bytes is likely a backup job or telemetry agent. Filter known-good destinations by ASN or hostname before presenting to analysts.
How do you load Zeek logs into a pandas DataFrame without hardcoding column names?
Parse the #fields header line at runtime. Read the file line by line until you find the line starting with '#fields', split it on tab, and drop the first element to get the column list. Pass those column names to pd.read_csv with a tab separator, comment='#', and na_values set to '-' and '(empty)'. This works across all Zeek log types (conn.log, dns.log, http.log) without maintaining a separate column mapping per log type.
How do you detect lateral movement via SMB in Windows Security event logs using Python?
Filter Event ID 4624 (successful logon) with LogonType 3 (network logon) from Security.evtx using the python-evtx library. Group by SubjectUserName and count distinct WorkstationName values and total logon events. Accounts reaching more than 5 distinct hosts with more than 20 logons in the observation window are candidates. Adjust the distinct_hosts threshold to your environment: strict segmentation means 2-3 unexpected hosts may warrant investigation; flat networks may require a higher threshold.
When should a threat hunting Jupyter notebook be converted to a Sigma rule?
When the detection hypothesis has proven reliable across multiple hunt runs and needs to run continuously on new data, or when other analysts need to maintain it. The notebook is for proving the hypothesis and tuning thresholds. Once the logic is validated, translate it to Sigma for production deployment. Sigma rules compile to Splunk SPL, Sentinel KQL, Elastic DSL, and other SIEM formats without platform-specific rewriting. Papermill can execute the notebook against fresh data on a schedule as an intermediate step before a full Sigma translation.
What Python library provides a consistent query interface across multiple SIEM platforms for threat hunting?
MSTICpy (Microsoft Threat Intelligence Python Security Tools) supports querying Microsoft Sentinel, Splunk, QRadar, and other data sources with consistent pandas DataFrame output. Hunt code written against one platform transfers to another with minimal changes. The initial credential configuration per environment is real setup work, but it pays off when a hunt hypothesis spans endpoint and network data from different platforms.

Want to learn more?

Explore our hands-on AI and cybersecurity training courses.

View Courses