Knowledge Required: Strong understanding of KQL concepts
Tools required: Microsoft Sentinel
Recently, there has been a dramatic shift in needing to protect the identity when organisations evaluate their biggest cybersecurity risk. Increasing popularity in capabilities like SSO (Single Sign-On) now mean one compromised account will grant an attacker access into many systems and allow them to laterally move across a technology stack. Notably throughout my SOC investigations in 2024, the majority of email-based phishing attacks that evade detection filters leveraged a 3rd party compromised account, exploiting the fact that two companies have an existing trust and relationship with one another.
Attackers will commonly ‘reap’ the value of a compromised account immediately; as such, there is often little done to disguise the attack, and in part also because many attacks are automated. However, commonality between these attacks is that an adversary will use VPN’s to hide the origin, often bouncing between multiple countries. Today’s article focuses on using Kusto with Entra ID logs to detect when atypical travel is observed from an account.
Getting our data
We can query the Entra ID sign in logs table with the statement below:
union AADNonInteractiveUserSignInLogs, SigninLogs
| where ResultType == 0
| sort by UserPrincipalName, TimeGenerated desc
*Note that we used AADNonInteractiveUserSignInLogs to also cover token-based attacks. *
The key concept is the ‘sort by’. This will result in a data sample like the following:
| TimeGenerated | UserPrincipalName | Location | … |
|---|---|---|---|
| 10-11-2024T13:33 | [email protected] | US | … |
| 10-11-2024T13:30 | [email protected] | US | … |
| 10-11-2024T12:30 | [email protected] | UK | … |
| 10-11-2024T14:30 | [email protected] | UK | … |
| 10-11-2024T14:28 | [email protected] | UK | … |
This results in a list of chronological events for each user which will be useful for comparison later.
Identifying useful evaluation data
For each record in a table, Kusto has a helpful function called ‘prev’ which allows us obtain the value a column has in the previous record. We’ll use this to identify if the current record being evaluated matches the user in the previous record. This can be done with:
| extend IsUserMatch=iff(prev(UserPrincipalName) == UserPrincipalName, true, false)
‘IsUserMatch’ is now created as a new column for each row and can be used to populate other useful comparison data that we will use for detection logic later:
| extend PreviousLocation=iff(IsUserMatch, prev(Location), '')
| extend PreviousIPAddress=iff(IsUserMatch, prev(IPAddress), '')
| extend PreviousId=iff(IsUserMatch, prev(Id), '') // may be useful to correlate results
| extend PreviousTimeGenerated=iff(IsUserMatch, prev(TimeGenerated), datetime(null))
Note that this means for each user’s sign in record, if the previous record was for the same user, there are populated columns with the prefix ‘Previous’.
Constructing our detection logic:
For each record, we have created columns that provides us the ability to compare values.
Lets implement the most obvious comparison to determine a suspicious sign-in:
| where Location != PreviousLocation and IsUserMatch // user has moved
The ‘Location’ field in Entra SignIn logs is a ISO two letter country code.
Observing a change in country does not always imply an account is compromised. People go on holiday or may have legitimate reasons for travel. In its current state, using a change in country is very broad detection. As we have an IP address, this allows us to use Kusto’s ‘geo_info_from_ip_address()’ function to get a more precise location. Similar to before, we will create a column based on the current and previous rows location:
| extend GeoPrevious=todynamic(geo_info_from_ip_address(PreviousIPAddress))
| extend GeoCurrent=todynamic(geo_info_from_ip_address(IPAddress))
In the new columns, we get 2 dictionary values with richer insight into the location of a user:
{"country":"The Netherlands","latitude":52.3824,"longitude":4.8995}
Lets also leverage another Kusto function which will calculate the distance between these two points. This takes 4 parameters, the longitude and latitude for each location.
| extend MeterDistance=geo_distance_2points(toint(GeoPrevious['longitude']),toint(GeoPrevious['latitude']),toint(GeoCurrent['longitude']),toint(GeoCurrent['latitude'])) // calculate meter distance
This returns us a numerical value of the amount of meters between each point.
This means our query now does the following:
- Detects when a user has switched between countries
- Enriched the IP addresses to get Geo-IP location
- Calculated the distance between two points
As the Entra logs contain a timestamp, we can elevate detection further by calculating how fast they’ve travelled between two points using the ‘PreviousTimeGenerated’ column created earlier:
| extend TimeDeltaSeconds=datetime_diff('second', PreviousTimeGenerated, TimeGenerated)
| extend MeterPerSecond = toreal(MeterDistance) / toreal(TimeDeltaSeconds)
The query results now have a complete view of how long it has taken to travel between the two locations. To remove legitimate travel from the results (false positives) we can create an acceptable threshold of how fast a user is allowed to travel between two points. Given that air travel is the fastest form of transport I can think of, I did about 5 minutes of research and felt that travel faster than 300 miles per hour is suspicious. Let’s introduce this threshold into the query:
| extend MilesPerHour = (MeterPerSecond / 0.000621371) * 360 // Convert to MilesPerSec, multiple by 360 (* 60 mins, *60 hour)
| where MilesPerHour > 300
Testing the logic
A simple way to test is by using the TOR network to login with an Office365 account. Refreshing the TOR session a few times means the login appears from various exit nodes across multiple countries and generates a Sentinel incident. The complete KQL for the rule can be found at the bottom of this article.
Considerations around the approach
This logic should be used as defence in depth, supported by other detection capabilities and indicators. If a user works part time or does not reguarly submit a sign-in log containing a location, it’s possible an attacker could sign-in from a new country without exceeding a 300mph travel time.
The rule relies on comparison data, which means it needs to run on a regular interval. Depending on how often the logic is evaluated, it may provide sufficient time for an attacker to exploit a compromised account with delayed detection.
If query results have location data via longitude and latitude, why still look to see if the country has changed? This relies on GeoIP data which is an approximation of location. It is quite common to see a user login from their mobile network and a broadband provider from the same location, but see a geographical distance of hundreds of kilometres. Using a change in country helps to reduce false positives.
Doesn’t Entra Identity Protection already do this? Yes, it does. However, small businesses may not have licensing available to use these features. As Microsoft does not (currently) charge to ingest Entra Sign-in activity to Sentinel, this detection can be done with minimal cost. You can also use this in conjunction with Identity Protection as you can set the MilesPerHour threshold to your own cybersecurity appetite.
Complete KQL
union AADNonInteractiveUserSignInLogs, SigninLogs
| where ResultType == 0
| sort by UserPrincipalName, TimeGenerated desc
// gather previous previous log comparison data
| extend IsUserMatch=iff(prev(UserPrincipalName) == UserPrincipalName, true, false)
| extend PreviousLocation=iff(IsUserMatch, prev(Location), '')
| extend PreviousIPAddress=iff(IsUserMatch, prev(IPAddress), '')
| extend PreviousId=iff(IsUserMatch, prev(Id), '') // may be useful to correlate results
| extend PreviousTimeGenerated=iff(IsUserMatch, prev(TimeGenerated), datetime(null))
// start main detection logic
| where Location != PreviousLocation and IsUserMatch // user has moved
| extend GeoPrevious=todynamic(geo_info_from_ip_address(PreviousIPAddress))
| extend GeoCurrent=todynamic(geo_info_from_ip_address(IPAddress))
| extend MeterDistance=geo_distance_2points(toint(GeoPrevious['longitude']),toint(GeoPrevious['latitude']),toint(GeoCurrent['longitude']),toint(GeoCurrent['latitude'])) // calculate meter distance
| extend TimeDeltaSeconds=datetime_diff('second', PreviousTimeGenerated, TimeGenerated)
| extend MeterPerSecond = toreal(MeterDistance) / toreal(TimeDeltaSeconds)
| extend MilesPerHour = (MeterPerSecond / 0.000621371) * 360 // Convert to MilesPerSec, multiple by 360 (* 60 mins, *60 hour)
| where MilesPerHour > 300
EOF